diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 75f928b69d..667f436a89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -196,6 +196,10 @@ private HiveTxnManager queryTxnMgr; private RuntimeStatsSource runtimeStatsSource; + // Boolean to store information about whether valid txn list was generated + // for current query. + private boolean validTxnListsGenerated; + private CacheUsage cacheUsage; private CacheEntry usedCacheEntry; @@ -613,6 +617,9 @@ public void run() { tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); sem = SemanticAnalyzerFactory.get(queryState, tree); openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control + generateValidTxnList(); sem.analyze(tree, ctx); hookCtx.update(sem); @@ -620,6 +627,9 @@ public void run() { } else { sem = SemanticAnalyzerFactory.get(queryState, tree); openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control + generateValidTxnList(); sem.analyze(tree, ctx); } LOG.info("Semantic Analysis Completed"); @@ -769,6 +779,24 @@ private void openTransaction() throws LockException, CommandProcessorResponse { } } + private void generateValidTxnList() throws LockException { + // Record current valid txn list that will be used throughout the query + // compilation and processing. We only do this if 1) a transaction + // was already opened and 2) the list has not been recorded yet, + // e.g., by an explicit open transaction command. + validTxnListsGenerated = false; + String currentTxnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + if (queryTxnMgr.isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { + try { + recordValidTxns(queryTxnMgr); + validTxnListsGenerated = true; + } catch (LockException e) { + LOG.error("Exception while acquiring valid txn list", e); + throw e; + } + } + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 @@ -1360,8 +1388,10 @@ private void acquireLocks() throws CommandProcessorResponse { /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if (queryTxnMgr.recordSnapshot(plan)) { - recordValidTxns(queryTxnMgr); + // This check is for controlling the correctness of the current state + if (queryTxnMgr.recordSnapshot(plan) && !validTxnListsGenerated) { + throw new IllegalStateException("calling recordValidTxn() more than once in the same " + + JavaUtils.txnIdToString(queryTxnMgr.getCurrentTxnId())); } if (plan.hasAcidResourcesInQuery()) { recordValidWriteIds(queryTxnMgr); @@ -1519,11 +1549,21 @@ public CommandProcessorResponse run(String command, boolean alreadyCompiled) { @Override public CommandProcessorResponse compileAndRespond(String command) { + return compileAndRespond(command, false); + } + + public CommandProcessorResponse compileAndRespond(String command, boolean cleanupTxnList) { try { compileInternal(command, false); return createProcessorResponse(0); } catch (CommandProcessorResponse e) { return e; + } finally { + if (cleanupTxnList) { + // Valid txn list might be generated for a query compiled using this + // command, thus we need to reset it + conf.unset(ValidTxnList.VALID_TXNS_KEY); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index ba0d56ae37..4c86fb8937 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -317,6 +317,7 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli } String validWriteIdString = driverConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); if (validWriteIdString != null) { + assert validTxnString != null; jc.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, validWriteIdString); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index efc0cfe940..39f40b1b85 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -126,7 +126,7 @@ public void testMetadataOperationLocks() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, false); dropTable(new String[] {"T"}); checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)")); - checkCmdOnDriver(driver.compileAndRespond("insert into T values (1,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T values (1,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -137,7 +137,7 @@ public void testMetadataOperationLocks() throws Exception { //simulate concurrent session HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("alter table T SET TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.compileAndRespond("alter table T SET TBLPROPERTIES ('transactional'='true')", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -155,7 +155,7 @@ public void testLocksInSubquery() throws Exception { checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); - checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -163,7 +163,7 @@ public void testLocksInSubquery() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); - checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -171,7 +171,7 @@ public void testLocksInSubquery() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); - checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -183,7 +183,7 @@ public void testLocksInSubquery() throws Exception { @Test public void createTable() throws Exception { dropTable(new String[] {"T"}); - CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)"); + CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -199,7 +199,7 @@ public void insertOverwriteCreate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T3(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2"); + cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -220,7 +220,7 @@ public void insertOverwritePartitionedCreate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T5(name string, age int, gpa double)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5"); + cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -239,13 +239,13 @@ public void basicBlocking() throws Exception { dropTable(new String[] {"T6"}); CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 List selectLocks = ctx.getHiveLocks(); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table if exists T6"); + cpr = driver.compileAndRespond("drop table if exists T6", true); checkCmdOnDriver(cpr); //tries to get X lock on T1 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); @@ -272,12 +272,12 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");//gets SS lock on T7 + cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6", true);//gets SS lock on T7 checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); + checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -295,16 +295,16 @@ public void updateSelectUpdate() throws Exception { dropTable(new String[] {"T8"}); CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("delete from T8 where b = 89"); + cpr = driver.compileAndRespond("delete from T8 where b = 89", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); checkCmdOnDriver(driver.run("start transaction")); - cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 + cpr = driver.compileAndRespond("select a from T8", true);//gets S lock on T8 checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); - checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); + checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1", true)); ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -332,7 +332,7 @@ public void testLockRetryLimit() throws Exception { conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); CommandProcessorResponse cpr = driver.run("create table T9(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select * from T9"); + cpr = driver.compileAndRespond("select * from T9", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega"); List locks = getLocks(); @@ -340,7 +340,7 @@ public void testLockRetryLimit() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table T9"); + cpr = driver.compileAndRespond("drop table T9", true); checkCmdOnDriver(cpr); try { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); @@ -366,7 +366,7 @@ public void testLockBlockedBy() throws Exception { dropTable(new String[] {"TAB_BLOCKED"}); CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select * from TAB_BLOCKED"); + cpr = driver.compileAndRespond("select * from TAB_BLOCKED", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM"); List locks = getLocks(); @@ -374,7 +374,7 @@ public void testLockBlockedBy() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table TAB_BLOCKED"); + cpr = driver.compileAndRespond("drop table TAB_BLOCKED", true); checkCmdOnDriver(cpr); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking locks = getLocks(); @@ -396,22 +396,22 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { // All DML should fail with DummyTxnManager on ACID table useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("select * from T10"); + cpr = driver.compileAndRespond("select * from T10", true); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); + cpr = driver.compileAndRespond("insert into table T10 values (1, 2)", true); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); + cpr = driver.compileAndRespond("update T10 set a=0 where b=1", true); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("delete from T10"); + cpr = driver.compileAndRespond("delete from T10", true); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); @@ -632,7 +632,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)"); + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)", true); checkCmdOnDriver(cpr); LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); List locks = getLocks(); @@ -641,7 +641,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn();; - cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -650,7 +650,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); + cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -659,7 +659,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); + cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -668,7 +668,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -676,7 +676,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); + cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -697,7 +697,7 @@ public void checkExpectedLocks2() throws Exception { checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')")); checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')")); txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na")); + checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 6, locks.size()); @@ -710,7 +710,7 @@ public void checkExpectedLocks2() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "T2"); - checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)")); + checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)", true)); LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 8, locks.size()); @@ -728,7 +728,7 @@ public void checkExpectedLocks2() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, false); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr3.openTxn(ctx, "T3"); - checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='blah') values(7,8)")); + checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='blah') values(7,8)", true)); ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr3); Assert.assertEquals("Unexpected lock count", 10, locks.size()); @@ -804,31 +804,31 @@ public void testShowLocksFilterOptions() throws Exception { HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr1); - cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)"); + cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)", true); checkCmdOnDriver(cpr); txnMgr1.acquireLocks(driver.getPlan(), ctx, "Tom"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)"); + cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)", true); checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Jerry"); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); - cpr = driver.compileAndRespond("select * from db2.t15"); + cpr = driver.compileAndRespond("select * from db2.t15", true); checkCmdOnDriver(cpr); txnMgr3.acquireLocks(driver.getPlan(), ctx, "Donald"); HiveTxnManager txnMgr4 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr4); - cpr = driver.compileAndRespond("select * from db2.t16"); + cpr = driver.compileAndRespond("select * from db2.t16", true); checkCmdOnDriver(cpr); txnMgr4.acquireLocks(driver.getPlan(), ctx, "Hillary"); HiveTxnManager txnMgr5 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr5); - cpr = driver.compileAndRespond("select * from db2.t14"); + cpr = driver.compileAndRespond("select * from db2.t14", true); checkCmdOnDriver(cpr); txnMgr5.acquireLocks(driver.getPlan(), ctx, "Obama"); @@ -900,13 +900,13 @@ public void testWriteSetTracking1() throws Exception { "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); + checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); txnMgr.commitTxn(); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra"); txnMgr2.commitTxn(); } @@ -930,7 +930,7 @@ public void testWriteSetTracking2() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.openTxn(ctx, "Peter"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter"); txnMgr2.openTxn(ctx, "Catherine"); List locks = getLocks(txnMgr); @@ -938,7 +938,7 @@ public void testWriteSetTracking2() throws Exception { //note that "update" uses dynamic partitioning thus lock is on the table not partition checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); txnMgr.commitTxn(); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine"); txnMgr2.commitTxn(); } @@ -953,7 +953,7 @@ public void testWriteSetTracking3() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); long txnId = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); @@ -961,7 +961,7 @@ public void testWriteSetTracking3() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); long txnId2 = txnMgr2.getCurrentTxnId(); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here @@ -1009,7 +1009,7 @@ public void testWriteSetTracking4() throws Exception { checkCmdOnDriver(cpr); txnMgr.openTxn(ctx, "Long Running"); - checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); @@ -1020,7 +1020,7 @@ public void testWriteSetTracking4() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Short Running"); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'", true));//no such partition txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1043,7 +1043,7 @@ public void testWriteSetTracking4() throws Exception { Assert.assertEquals( 0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); txnMgr2.openTxn(ctx, "T3"); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'", true));//pretend this partition exists txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1069,7 +1069,7 @@ public void testWriteSetTracking4() throws Exception { houseKeeper.run(); //since T3 overlaps with Long Running (still open) GC does nothing Assert.assertEquals(1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1", true));//no rows match txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr.getCurrentTxnId()), @@ -1104,12 +1104,12 @@ public void testWriteSetTracking5() throws Exception { txnMgr.openTxn(ctx, "Known"); long txnId = txnMgr2.openTxn(ctx, "Unknown"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1140,14 +1140,14 @@ public void testWriteSetTracking6() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " + "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113")); + checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Works"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); locks = getLocks(txnMgr); @@ -1181,7 +1181,7 @@ public void testWriteSetTracking7() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); //test with predicates such that partition pruning works - checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1190,7 +1190,7 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'", true)); long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1232,7 +1232,7 @@ public void testWriteSetTracking7() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate3 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); @@ -1242,7 +1242,7 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2", true)); long idTxnUpdate4 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); @@ -1293,7 +1293,7 @@ public void testWriteSetTracking8() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1303,7 +1303,7 @@ public void testWriteSetTracking8() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'", true)); long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1351,7 +1351,7 @@ public void testWriteSetTracking9() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1361,7 +1361,7 @@ public void testWriteSetTracking9() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); long idTxnDelete1 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1415,7 +1415,7 @@ public void testWriteSetTracking10() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1424,7 +1424,7 @@ public void testWriteSetTracking10() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1479,7 +1479,7 @@ public void testWriteSetTracking11() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));//start "delete from tab1" txn + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2", true));//start "delete from tab1" txn long txnIdDelete = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1490,10 +1490,10 @@ public void testWriteSetTracking11() throws Exception { //now start concurrent "select * from tab1" txn swapTxnManager(txnMgr); checkCmdOnDriver(driver.run("start transaction"));//start explicit txn so that txnMgr knows it - checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'", true)); long txnIdSelect = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 5, locks.size()); @@ -1609,7 +1609,7 @@ public void testMultiInsert() throws Exception { public void testShowLocksAgentInfo() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists XYZ (a int, b int)"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8")); + checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "XYZ"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); @@ -1652,7 +1652,7 @@ private void testMerge3Way(boolean cc) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 - "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); @@ -1669,7 +1669,7 @@ private void testMerge3Way(boolean cc) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 - "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); locks = getLocks(); @@ -1860,10 +1860,10 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); if(causeConflict) { - checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1", true)); } else { - checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)", true)); } long txnid1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); @@ -1889,7 +1889,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + - "when not matched then insert values(s.a,s.b)")); + "when not matched then insert values(s.a,s.b)", true)); long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(); @@ -1966,7 +1966,7 @@ public void testDynamicPartitionInsert() throws Exception { "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')")); long txnid1 = txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1999,7 +1999,7 @@ public void testDynamicPartitionInsert() throws Exception { long txnid2 = txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2039,7 +2039,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1", true)); long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); @@ -2053,7 +2053,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " + - "when not matched then insert values(a1,b1,p1,q1)")); + "when not matched then insert values(a1,b1,p1,q1)", true)); long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr); @@ -2180,7 +2180,7 @@ public void testShowTablesLock() throws Exception { checkCmdOnDriver(cpr); long txnid1 = txnMgr.openTxn(ctx, "Fifer"); - checkCmdOnDriver(driver.compileAndRespond("insert into T values(1,3)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T values(1,3)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2190,7 +2190,7 @@ public void testShowTablesLock() throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Fidler"); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("show tables")); + checkCmdOnDriver(driver.compileAndRespond("show tables", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -2209,7 +2209,7 @@ public void testShowTablesLock() throws Exception { "into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2219,7 +2219,7 @@ public void testShowTablesLock() throws Exception { txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Fidler"); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("show tables")); + checkCmdOnDriver(driver.compileAndRespond("show tables", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -2236,12 +2236,12 @@ public void testFairness() throws Exception { dropTable(new String[] {"T6"}); CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table if exists T6"); + cpr = driver.compileAndRespond("drop table if exists T6", true); checkCmdOnDriver(cpr); //tries to get X lock on T6 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); @@ -2254,7 +2254,7 @@ public void testFairness() throws Exception { swapTxnManager(txnMgr3); //this should block behind the X lock on T6 //this is a contrived example, in practice this query would of course fail after drop table - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);//gets S lock on T6 locks = getLocks(); @@ -2282,12 +2282,12 @@ public void testFairness2() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run( "insert into T7 partition(p) values(1,1),(1,2)"));//create 2 partitions - cpr = driver.compileAndRespond("select a from T7 "); + cpr = driver.compileAndRespond("select a from T7 ", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T7 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)"); + cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)", true); checkCmdOnDriver(cpr); //tries to get X lock on T7.p=1 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, @@ -2302,7 +2302,7 @@ public void testFairness2() throws Exception { HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); //this should block behind the X lock on T7.p=1 - cpr = driver.compileAndRespond("select a from T7"); + cpr = driver.compileAndRespond("select a from T7", true); checkCmdOnDriver(cpr); //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2 ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);