diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index b8771d2..33fbffe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -329,7 +329,9 @@ private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException { // Walk through all our inputs and set them to note that this read is part of an update or a // delete. for (ReadEntity input : inputs) { - input.setUpdateOrDelete(true); + if(isWritten(input)) { + input.setUpdateOrDelete(true); + } } if (inputIsPartitioned(inputs)) { @@ -377,6 +379,18 @@ private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException { } } + /** + * Check that {@code readEntity} is also being written + */ + private boolean isWritten(Entity readEntity) { + for(Entity writeEntity : outputs) { + //make sure to compare them as Entity, i.e. that it's the same table or partition, etc + if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) { + return true; + } + } + return false; + } private String operation() { if (updating()) return "update"; else if (deleting()) return "delete"; diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 836b507..6e2cf30 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -71,6 +71,39 @@ public void tearDown() throws Exception { TxnDbUtil.prepDb(); } @Test + public void testLocksInSubquery() throws Exception { + checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)")); + checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); + + checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); + txnMgr.openTxn("one"); + txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1)); + txnMgr.rollbackTxn(); + + checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); + txnMgr.openTxn("one"); + txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1)); + txnMgr.rollbackTxn(); + + checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); + txnMgr.openTxn("three"); + txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks.get(1)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks.get(2)); + } + @Test public void createTable() throws Exception { CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)"); checkCmdOnDriver(cpr);