From e917ad2a738b351552ced219c9489e96cbb9e4e0 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Sat, 18 Apr 2020 01:41:29 +0200 Subject: [PATCH] init --- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../apache/hadoop/hive/ql/io/AcidUtils.java | 26 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 1052 ++++++++++------- .../hive/metastore/LockRequestBuilder.java | 15 +- .../hive/metastore/LockTypeComparator.java | 56 + .../hadoop/hive/metastore/txn/TxnHandler.java | 57 +- .../hive/metastore/utils/LockTypeUtil.java | 18 + 7 files changed, 756 insertions(+), 473 deletions(-) create mode 100644 standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockTypeComparator.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7b3acad511..01f1ca12be 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2728,6 +2728,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Ensures commands with OVERWRITE (such as INSERT OVERWRITE) acquire Exclusive locks for\n" + "transactional tables. This ensures that inserts (w/o overwrite) running concurrently\n" + "are not hidden by the INSERT OVERWRITE."), + TXN_WRITE_X_LOCK("hive.txn.write.xlock", true, + "Ensures that for ACID resources commands with OVERWRITE (such as INSERT OVERWRITE) acquire EXCLUSIVE lock, " + + "UPDATE/DELETE acquire EXCL_WRITE lock, INSERT acquires SHARED_READ lock.\n " + + "When this flag is off, SHARED_WRITE lock is used for INSERT & UPDATE/DELETE, EXCL_WRITE - for commands with OVERWRITE." + + "This enables better level of parallelism and write-write conflict resolution at the commit phase"), HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true, "Whether Hive supports transactional stats (accurate stats for transactional tables)"), HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 77878ca40b..8822f24b8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -3024,10 +3024,10 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi case INSERT_OVERWRITE: t = AcidUtils.getTable(output); if (AcidUtils.isTransactionalTable(t)) { - if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK)) { + if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)) { compBuilder.setExclusive(); } else { - compBuilder.setSharedWrite(); + compBuilder.setExclWrite(); } compBuilder.setOperationType(DataOperationType.UPDATE); } else { @@ -3038,7 +3038,11 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi case INSERT: assert t != null; if (AcidUtils.isTransactionalTable(t)) { - compBuilder.setSharedRead(); + if (conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)) { + compBuilder.setSharedRead(); + } else { + compBuilder.setSharedWrite(); + } } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) { final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(), "Thought all the non native tables have an instance of storage handler"); @@ -3068,13 +3072,15 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi } break; - case UPDATE: - compBuilder.setSharedWrite(); - compBuilder.setOperationType(DataOperationType.UPDATE); - break; - case DELETE: - compBuilder.setSharedWrite(); - compBuilder.setOperationType(DataOperationType.DELETE); + case UPDATE: case DELETE: + assert t != null; + if (AcidUtils.isTransactionalTable(t) && !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)) { + compBuilder.setSharedWrite(); + } else { + compBuilder.setExclWrite(); + } + compBuilder.setOperationType(DataOperationType.valueOf( + output.getWriteType().name())); break; case DDL_NO_LOCK: diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 73d3b91585..e233f71cbc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -55,7 +55,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,22 +85,22 @@ private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; - private Driver driver, driver2; + private Driver driver; private TxnStore txnHandler; - public TestDbTxnManager2() throws Exception { - conf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + public TestDbTxnManager2() { + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); TxnDbUtil.setConfValues(conf); } + @Before public void setUp() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); - driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); TxnDbUtil.cleanDb(conf); TxnDbUtil.prepDb(conf); SessionState ss = SessionState.get(); @@ -109,12 +108,11 @@ public void setUp() throws Exception { txnMgr = ss.getTxnMgr(); Assert.assertTrue(txnMgr instanceof DbTxnManager); txnHandler = TxnUtils.getTxnStore(conf); - } + @After public void tearDown() throws Exception { driver.close(); - driver2.close(); if (txnMgr != null) { txnMgr.closeTxnManager(); } @@ -150,19 +148,33 @@ public void testMetadataOperationLocks() throws Exception { txnMgr.commitTxn(); conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, isStrict); } + @Test public void testLocksInSubquery() throws Exception { + testLocksInSubquery(false); + } + @Test + public void testLocksInSubquerySharedWrite() throws Exception { + testLocksInSubquery(true); + } + + private void testLocksInSubquery(boolean sharedWrite) throws Exception { dropTable(new String[] {"T", "S", "R"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver.run("create table if not exists T (a int, b int)"); - driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists S (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists R (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.compileAndRespond("delete from S where a in (select a from T where b = 1)", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)", true); @@ -170,7 +182,8 @@ public void testLocksInSubquery() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)", true); @@ -179,11 +192,13 @@ public void testLocksInSubquery() throws Exception { Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "R", null, locks); txnMgr.rollbackTxn(); } + @Test - public void createTable() throws Exception { + public void testCreateTable() throws Exception { dropTable(new String[] {"T"}); driver.compileAndRespond("create table if not exists T (a int, b int)", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); @@ -193,19 +208,24 @@ public void createTable() throws Exception { txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); } + @Test - public void insertOverwriteCreate() throws Exception { - insertOverwriteCreate(false); + public void testInsertOverwriteCreate() throws Exception { + testInsertOverwriteCreate(false, false); } @Test - public void insertOverwriteCreateAcid() throws Exception { - insertOverwriteCreate(true); + public void testInsertOverwriteCreateAcid() throws Exception { + testInsertOverwriteCreate(true, false); } - private void insertOverwriteCreate(boolean isTransactional) throws Exception { - if(isTransactional) { - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - } + @Test + public void testInsertOverwriteCreateSharedWrite() throws Exception { + testInsertOverwriteCreate(true, true); + } + + private void testInsertOverwriteCreate(boolean isTransactional, boolean sharedWrite) throws Exception { dropTable(new String[] {"T2", "T3"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, isTransactional); driver.run("create table if not exists T2(a int)"); driver.run("create table T3(a int) stored as ORC"); driver.compileAndRespond("insert overwrite table T3 select a from T2", true); @@ -213,7 +233,8 @@ private void insertOverwriteCreate(boolean isTransactional) throws Exception { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks); - checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks); + checkLock((isTransactional && sharedWrite) ? LockType.EXCL_WRITE : LockType.EXCLUSIVE, + LockState.ACQUIRED, "default", "T3", null, locks); txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); driver.run("drop table if exists T1"); @@ -221,18 +242,22 @@ private void insertOverwriteCreate(boolean isTransactional) throws Exception { } @Test - public void insertOverwritePartitionedCreate() throws Exception { - insertOverwritePartitionedCreate(true); + public void testInsertOverwritePartitionedCreate() throws Exception { + testInsertOverwritePartitionedCreate(false, false); } @Test - public void insertOverwritePartitionedCreateAcid() throws Exception { - insertOverwritePartitionedCreate(false); + public void testInsertOverwritePartitionedCreateAcid() throws Exception { + testInsertOverwritePartitionedCreate(true, false); } - private void insertOverwritePartitionedCreate(boolean isTransactional) throws Exception { + @Test + public void testInsertOverwritePartitionedCreateSharedWrite() throws Exception { + testInsertOverwritePartitionedCreate(true, true); + } + + private void testInsertOverwritePartitionedCreate(boolean isTransactional, boolean sharedWrite) throws Exception { dropTable(new String[] {"T4", "T5"}); - if(isTransactional) { - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - } + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, isTransactional); driver.run("create table T4 (name string, gpa double) partitioned by (age int) stored as ORC"); driver.run("create table T5(name string, age int, gpa double)"); driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5", true); @@ -240,7 +265,8 @@ private void insertOverwritePartitionedCreate(boolean isTransactional) throws Ex List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks); - checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks); + checkLock((isTransactional && sharedWrite) ? LockType.EXCL_WRITE : LockType.EXCLUSIVE, + LockState.ACQUIRED, "default", "T4", null, locks); txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); driver.run("drop table if exists T5"); @@ -248,24 +274,23 @@ private void insertOverwritePartitionedCreate(boolean isTransactional) throws Ex } @Test - public void basicBlocking() throws Exception { + public void testBasicBlocking() throws Exception { dropTable(new String[] {"T6"}); driver.run("create table if not exists T6(a int)"); driver.compileAndRespond("select a from T6", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets S lock on T6 - List selectLocks = ctx.getHiveLocks(); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); driver.compileAndRespond("drop table if exists T6", true); //tries to get X lock on T1 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); txnMgr.rollbackTxn(); //release S on T6 //attempt to X on T6 again - succeed - lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks); @@ -274,11 +299,13 @@ public void basicBlocking() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); } + @Test - public void lockConflictDbTable() throws Exception { + public void testLockConflictDbTable() throws Exception { dropTable(new String[] {"temp.T7"}); driver.run("create database if not exists temp"); - driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists temp.T7(a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.compileAndRespond("update temp.T7 set a = 5 where b = 6", true); //gets SS lock on T7 txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); @@ -296,10 +323,21 @@ public void lockConflictDbTable() throws Exception { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks); txnMgr2.commitTxn(); } + + @Test + public void testUpdateSelectUpdate() throws Exception { + testUpdateSelectUpdate(false); + } @Test - public void updateSelectUpdate() throws Exception { + public void testUpdateSelectUpdateSharedWrite() throws Exception { + testUpdateSelectUpdate(true); + } + + private void testUpdateSelectUpdate(boolean sharedWrite) throws Exception { dropTable(new String[] {"T8"}); - driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver.run("create table T8(a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.compileAndRespond("delete from T8 where b = 89", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets SS lock on T8 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); @@ -311,15 +349,19 @@ public void updateSelectUpdate() throws Exception { ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false); //waits for SS lock on T8 from fifer List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "T8", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "T8", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), "default", "T8", null, locks); driver.releaseLocksAndCommitOrRollback(false, txnMgr); ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "T8", null, locks); driver.releaseLocksAndCommitOrRollback(true, txnMgr2); swapTxnManager(txnMgr); driver.run("drop table if exists T6"); @@ -358,12 +400,12 @@ public void testLockRetryLimit() throws Exception { * This test is somewhat abusive in that it make DbLockManager retain locks for 2 * different queries (which are not part of the same transaction) which can never * happen in real use cases... but it makes testing convenient. - * @throws Exception */ @Test public void testLockBlockedBy() throws Exception { dropTable(new String[] {"TAB_BLOCKED"}); - driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table TAB_BLOCKED (a int, b int) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.compileAndRespond("select * from TAB_BLOCKED", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM"); List locks = getLocks(); @@ -385,7 +427,8 @@ public void testLockBlockedBy() throws Exception { public void testDummyTxnManagerOnAcidTable() throws Exception { dropTable(new String[] {"T10", "T11"}); // Create an ACID table with DbTxnManager - driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table T10 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); // All DML should fail with DummyTxnManager on ACID table @@ -411,7 +454,8 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { driver.compileAndRespond("update T10 set a=0 where b=1", true); } catch (CommandProcessorException e) { Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), e.getResponseCode()); - Assert.assertTrue(e.getMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + Assert.assertTrue(e.getMessage().contains( + "Attempt to do update or delete using transaction manager that does not support these operations.")); } useDummyTxnManagerTemporarily(conf); @@ -419,9 +463,9 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { driver.compileAndRespond("delete from T10", true); } catch (CommandProcessorException e) { Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), e.getResponseCode()); - Assert.assertTrue(e.getMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + Assert.assertTrue(e.getMessage().contains( + "Attempt to do update or delete using transaction manager that does not support these operations.")); } - conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); } @@ -446,19 +490,21 @@ private void useDummyTxnManagerTemporarily(HiveConf hiveConf) throws Exception { * we clean up relevant records as soon as a table/partition is dropped. * * Note, here we don't need to worry about cleaning up TXNS table, since it's handled separately. - * @throws Exception */ @Test public void testMetastoreTablesCleanup() throws Exception { dropTable(new String[] {"temp.T10", "temp.T11", "temp.T12p", "temp.T13p"}); - driver.run("create database if not exists temp"); // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - partitioned table - driver.run("create table temp.T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table temp.T11 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table temp.T10 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table temp.T11 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); // Successfully insert some data into ACID tables, so that we have records in COMPLETED_TXN_COMPONENTS driver.run("insert into temp.T10 values (1, 1)"); @@ -470,9 +516,11 @@ public void testMetastoreTablesCleanup() throws Exception { driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (13, 13)"); driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)"); driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)"); - int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); Assert.assertEquals(4, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); Assert.assertEquals(5, count); // Fail some inserts, so that we have records in TXN_COMPONENTS @@ -481,54 +529,71 @@ public void testMetastoreTablesCleanup() throws Exception { driver.run("insert into temp.T11 values (10, 10)"); driver.run("insert into temp.T12p partition (ds='today', hour='1') values (11, 11)"); driver.run("insert into temp.T13p partition (ds='today', hour='1') values (12, 12)"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(4, count); conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); // Drop a table/partition; corresponding records in TXN_COMPONENTS and COMPLETED_TXN_COMPONENTS should disappear - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE='t10'"); Assert.assertEquals(1, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE='t10'"); Assert.assertEquals(2, count); driver.run("drop table temp.T10"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE='t10'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE='t10'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); Assert.assertEquals(1, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); Assert.assertEquals(1, count); driver.run("alter table temp.T12p drop partition (ds='today', hour='1')"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); Assert.assertEquals(0, count); // Successfully perform compaction on a table/partition, so that we have successful records in COMPLETED_COMPACTIONS driver.run("alter table temp.T11 compact 'minor'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); TestTxnCommands2.runWorker(conf); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); TestTxnCommands2.runCleaner(conf); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); Assert.assertEquals(1, count); driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'minor'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); TestTxnCommands2.runWorker(conf); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); TestTxnCommands2.runCleaner(conf); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); Assert.assertEquals(1, count); // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS. @@ -537,127 +602,165 @@ public void testMetastoreTablesCleanup() throws Exception { driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (15, 15)"); conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); driver.run("alter table temp.T11 compact 'major'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); TestTxnCommands2.runWorker(conf); // will fail - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); Assert.assertEquals(1, count); driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); TestTxnCommands2.runWorker(conf); // will fail - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); Assert.assertEquals(1, count); conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); // Put 2 records into COMPACTION_QUEUE and do nothing driver.run("alter table temp.T11 compact 'major'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); // Drop a table/partition, corresponding records in COMPACTION_QUEUE and COMPLETED_COMPACTIONS should disappear driver.run("drop table temp.T11"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t11'"); Assert.assertEquals(0, count); driver.run("alter table temp.T12p drop partition (ds='tomorrow', hour='2')"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE='t12p'"); Assert.assertEquals(0, count); // Put 1 record into COMPACTION_QUEUE and do nothing driver.run("alter table temp.T13p partition (ds='today', hour='1') compact 'major'"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); // Drop database, everything in all 4 meta tables should disappear - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(1, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(2, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(1, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(0, count); driver.run("drop database if exists temp cascade"); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS " + + "where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS " + + "where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE " + + "where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(0, count); - count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS " + + "where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); Assert.assertEquals(0, count); } /** * collection of queries where we ensure that we get the locks that are expected - * @throws Exception */ @Test - public void checkExpectedLocks() throws Exception { + public void testCheckExpectedLocks() throws Exception { + testCheckExpectedLocks(false); + } + @Test + public void testCheckExpectedLocksSharedWrite() throws Exception { + testCheckExpectedLocks(true); + } + + private void testCheckExpectedLocks(boolean sharedWrite) throws Exception { dropTable(new String[] {"acidPart", "nonAcidPart"}); - driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table acidPart(a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) " + + "stored as orc TBLPROPERTIES ('transactional'='false')"); driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)", true); - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks); - txnMgr.rollbackTxn();; + txnMgr.rollbackTxn(); driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)", true); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks); txnMgr.rollbackTxn(); driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)", true); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "acidPart", null, locks); txnMgr.rollbackTxn(); driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)", true); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "acidPart", "p=1", locks); txnMgr.rollbackTxn(); driver.compileAndRespond("update acidPart set b = 17 where a = 1", true); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "acidPart", null, locks); txnMgr.rollbackTxn(); driver.compileAndRespond("update acidPart set b = 17 where p = 1", true); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); //https://issues.apache.org/jira/browse/HIVE-13212 + //https://issues.apache.org/jira/browse/HIVE-13212 + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "acidPart", null, locks); txnMgr.rollbackTxn(); } + /** * Check to make sure we acquire proper locks for queries involving acid and non-acid tables */ @Test - public void checkExpectedLocks2() throws Exception { + public void testCheckExpectedLocks2() throws Exception { dropTable(new String[] {"tab_acid", "tab_not_acid"}); driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -680,7 +783,7 @@ public void checkExpectedLocks2() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "T2"); driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)", true); - LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 7, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_acid", null, locks); @@ -715,12 +818,12 @@ public void checkExpectedLocks2() throws Exception { * Check to make sure we acquire proper locks for queries involving non-strict locking */ @Test - public void checkExpectedReadLocksForNonAcidTables() throws Exception { + public void testCheckExpectedReadLocksForNonAcidTables() throws Exception { dropTable(new String[] {"tab_acid", "tab_not_acid"}); driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + - "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); driver.run("create table if not exists tab_not_acid (na int, nb int) partitioned by (np string) " + - "clustered by (na) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + "clustered by (na) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"); driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')"); @@ -740,7 +843,7 @@ public void checkExpectedReadLocksForNonAcidTables() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "T2"); driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)", true); - LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 4, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_acid", null, locks); @@ -793,13 +896,22 @@ public void testLockingOnInsertOverwriteNonNativeTables() throws Exception { /** The list is small, and the object is generated, so we don't use sets/equals/etc. */ public static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, List actuals) { + return checkLock(expectedType, expectedState, expectedDb, expectedTable, expectedPartition, actuals, false); + } + + private static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb, + String expectedTable, String expectedPartition, List actuals, boolean skipFirst) { + boolean skip = skipFirst; for (ShowLocksResponseElement actual : actuals) { if (expectedType == actual.getType() && expectedState == actual.getState() && StringUtils.equals(normalizeCase(expectedDb), normalizeCase(actual.getDbname())) && StringUtils.equals(normalizeCase(expectedTable), normalizeCase(actual.getTablename())) && StringUtils.equals( normalizeCase(expectedPartition), normalizeCase(actual.getPartname()))) { - return actual; + if(!skip){ + return actual; + } + skip = false; } } Assert.fail("Could't find {" + expectedType + ", " + expectedState + ", " + expectedDb @@ -817,6 +929,7 @@ public static ShowLocksResponseElement checkLock(LockType expectedType, LockStat public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { return SessionState.get().setTxnMgr(txnMgr); } + @Test public void testShowLocksFilterOptions() throws Exception { driver.run("drop table if exists db1.t14"); @@ -828,10 +941,14 @@ public void testShowLocksFilterOptions() throws Exception { driver.run("create database if not exists db1"); driver.run("create database if not exists db2"); - driver.run("create table if not exists db1.t14 (a int, b int) partitioned by (ds string) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table if not exists db2.t14 (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table if not exists db2.t15 (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - driver.run("create table if not exists db2.t16 (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists db1.t14 (a int, b int) partitioned by (ds string) " + + "clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists db2.t14 (a int, b int) " + + "clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists db2.t15 (a int, b int) " + + "clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table if not exists db2.t16 (a int, b int) " + + "clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); // Acquire different locks at different levels @@ -888,8 +1005,7 @@ public void testShowLocksFilterOptions() throws Exception { // Note that it shouldn't show t14 from db2 // SHOW LOCKS t14 PARTITION ds='today' - Map partSpec = new HashMap(); - partSpec.put("ds", "today"); + Map partSpec = Collections.singletonMap("ds", "today"); locks = getLocksWithFilterOptions(txnMgr, null, "t14", partSpec); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "db1", "t14", "ds=today", locks); @@ -900,6 +1016,7 @@ public void testShowLocksFilterOptions() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "db2", "t15", null, locks); } + private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } @@ -907,11 +1024,13 @@ private static String normalizeCase(String s) { private List getLocks() throws Exception { return getLocks(txnMgr); } + private List getLocks(HiveTxnManager txnMgr) throws Exception { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); return rsp.getLocks(); } - /** + + /** * txns update same resource but do not overlap in time - no conflict */ @Test @@ -930,11 +1049,13 @@ public void testWriteSetTracking1() throws Exception { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra"); txnMgr2.commitTxn(); } + private void dropTable(String[] tabs) throws Exception { for(String tab : tabs) { driver.run("drop table if exists " + tab); } } + /** * txns overlap in time but do not update same resource - no conflict */ @@ -960,7 +1081,7 @@ public void testWriteSetTracking2() throws Exception { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine"); txnMgr2.commitTxn(); } - + /** * txns overlap and update the same resource - can't commit 2nd txn */ @@ -985,7 +1106,7 @@ public void testWriteSetTracking3() throws Exception { locks = getLocks(txnMgr2); //should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); long writeId = txnMgr.getTableWriteId("default", "TAB_PART"); AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeId, "default", "TAB_PART", Collections.singletonList("p=blah")); @@ -1005,12 +1126,13 @@ public void testWriteSetTracking3() throws Exception { catch (LockException e) { expectedException = e; } - Assert.assertTrue("Didn't get exception", expectedException != null); + Assert.assertNotNull("Didn't get exception", expectedException); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", "Aborting [txnid:"+txnId2+","+txnId2+"] due to a write conflict on default/tab_part/p=blah committed by [txnid:"+txnId+","+txnId2+"] u/u", expectedException.getCause().getMessage()); } + /** * txns overlap, update same resource, simulate multi-stmt txn case * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed @@ -1106,9 +1228,9 @@ public void testWriteSetTracking4() throws Exception { houseKeeper.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); } + /** * overlapping txns updating the same resource but 1st one rolls back; 2nd commits - * @throws Exception */ @Test public void testWriteSetTracking5() throws Exception { @@ -1131,7 +1253,7 @@ public void testWriteSetTracking5() throws Exception { locks = getLocks(txnMgr2); //should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); txnMgr.rollbackTxn(); AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "TAB_PART"); @@ -1139,15 +1261,15 @@ public void testWriteSetTracking5() throws Exception { AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst); Assert.assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId()); - AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeIds.getTxnToWriteIds().get(0).getWriteId(), - "default", "TAB_PART", - Arrays.asList("p=blah")); + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeIds.getTxnToWriteIds().get(0).getWriteId(), + "default", "TAB_PART", Collections.singletonList("p=blah")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); txnMgr2.commitTxn(); //since conflicting txn rolled back, commit succeeds Assert.assertEquals(1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); } + /** * check that read query concurrent with txn works ok */ @@ -1182,10 +1304,9 @@ public void testWriteSetTracking6() throws Exception { writeSetService.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); } - + /** * 2 concurrent txns update different partitions of the same table and succeed - * @throws Exception */ @Test public void testWriteSetTracking7() throws Exception { @@ -1198,7 +1319,6 @@ public void testWriteSetTracking7() throws Exception { swapTxnManager(txnMgr2); //test with predicates such that partition pruning works driver.compileAndRespond("update tab2 set b = 7 where p='two'", true); - long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); @@ -1207,7 +1327,6 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); driver.compileAndRespond("update tab2 set b = 7 where p='one'", true); - long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1233,13 +1352,19 @@ public void testWriteSetTracking7() throws Exception { txnMgr.commitTxn(); //txnid:idTxnUpdate2 //now both txns concurrently updated TAB2 but different partitions. - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'")); //2 from txnid:1, 1 from txnid:2, 1 from txnid:3 - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null")); //================ //test with predicates such that partition pruning doesn't kick in @@ -1249,7 +1374,6 @@ public void testWriteSetTracking7() throws Exception { driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"); //txnid:4 swapTxnManager(txnMgr2); driver.compileAndRespond("update tab1 set b = 7 where b=1", true); - long idTxnUpdate3 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1259,14 +1383,13 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); driver.compileAndRespond("update tab1 set b = 7 where b = 2", true); - long idTxnUpdate4 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 4, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //this simulates the completion of txnid:idTxnUpdate3 writeId = txnMgr2.getTableWriteId("default", "tab1"); @@ -1289,14 +1412,21 @@ public void testWriteSetTracking7() throws Exception { txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); //txnid:idTxnUpdate4 - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); //2 from insert + 1 for each update stmt - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + /** * Concurrent updates with partition pruning predicate and w/o one */ @@ -1309,7 +1439,6 @@ public void testWriteSetTracking8() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); driver.compileAndRespond("update tab1 set b = 7 where b=1", true); - long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1319,13 +1448,12 @@ public void testWriteSetTracking8() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); driver.compileAndRespond("update tab1 set b = 7 where p='two'", true); - long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); //this simulates the completion of txnid:idTxnUpdate1 long writeId = txnMgr2.getTableWriteId("default", "tab1"); @@ -1347,13 +1475,20 @@ public void testWriteSetTracking8() throws Exception { txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); //txnid:idTxnUpdate2 - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + /** * Concurrent update/delete of different partitions - should pass */ @@ -1382,7 +1517,7 @@ public void testWriteSetTracking9() throws Exception { Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); //this simulates the completion of txnid:idTxnUpdate1 long writeId = txnMgr2.getTableWriteId("default", "tab1"); @@ -1404,19 +1539,35 @@ public void testWriteSetTracking9() throws Exception { txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); //txnid:idTxnUpdate2 - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (idTxnUpdate1 - 1) + " and ctc_table='tab1'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnUpdate1 + " and ctc_table='tab1' and ctc_partition='p=one'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnDelete1 + " and ctc_table='tab1' and ctc_partition='p=two'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (idTxnUpdate1 - 1) + + " and ctc_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnUpdate1 + + " and ctc_table='tab1' and ctc_partition='p=one'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnDelete1 + + " and ctc_table='tab1' and ctc_partition='p=two'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + /** * Concurrent update/delete of same partition - should fail to commit */ @@ -1443,7 +1594,7 @@ public void testWriteSetTracking10() throws Exception { Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); //this simulates the completion of "Update tab2" txn long writeId = txnMgr2.getTableWriteId("default", "tab1"); @@ -1470,16 +1621,21 @@ public void testWriteSetTracking10() throws Exception { catch(LockException e) { exception = e; } - Assert.assertNotEquals("Expected exception", null, exception); + Assert.assertNotNull("Expected exception", exception); Assert.assertEquals("Exception msg doesn't match", "Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two committed by [txnid:4,5] d/u", exception.getCause().getMessage()); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 3, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + /** * Concurrent delete/delete of same partition - should NOT pass */ @@ -1513,7 +1669,7 @@ public void testWriteSetTracking11() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); //this simulates the completion of "delete from tab1" txn long writeId = txnMgr2.getTableWriteId("default", "tab1"); @@ -1549,14 +1705,19 @@ public void testWriteSetTracking11() throws Exception { Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent(conf, - "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' " + + "and ws_table='tab1' and ws_txnid=" + txnIdDelete)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), 0, TxnDbUtil.countQueryAgent(conf, - "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); - Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' " + + "and ws_table='tab1' and ws_txnid=" + txnIdSelect)); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 3, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + @Test public void testCompletedTxnComponents() throws Exception { dropTable(new String[] {"TAB1", "tab_not_acid2"}); @@ -1565,17 +1726,22 @@ public void testCompletedTxnComponents() throws Exception { driver.run("create table if not exists tab_not_acid2 (a int, b int)"); driver.run("insert into tab_not_acid2 values(1,1),(2,2)"); //writing both acid and non-acid resources in the same txn - driver.run("from tab_not_acid2 insert into tab1 partition(p='two')(a,b) select a,b insert into tab_not_acid2(a,b) select a,b "); //txnid:1 + driver.run("from tab_not_acid2 insert into tab1 partition(p='two')(a,b) select a,b " + + "insert into tab_not_acid2(a,b) select a,b "); //txnid:1 Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tab1'")); + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tab1'")); } - /** - * ToDo: multi-insert into txn table and non-tx table should be prevented + /* + * TODO: + * multi-insert into txn table and non-tx table should be prevented, + * concurrent insert/update of same partition - should pass */ + @Test public void testMultiInsert() throws Exception { dropTable(new String[] {"TAB1", "tab_not_acid"}); @@ -1588,20 +1754,21 @@ public void testMultiInsert() throws Exception { driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"); //txinid:8 //writing both acid and non-acid resources in the same txn //tab1 write is a dynamic partition insert - driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"); //txnid:9 + driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p " + + "insert into tab_not_acid(a,b) select a,b where p='two'"); //txnid:9 Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9")); Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9 and ctc_table='tab1'")); + 2, TxnDbUtil.countQueryAgent(conf, + "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9 and ctc_table='tab1'")); } - //todo: Concurrent insert/update of same partition - should pass - @Test public void testMultiInsertOnDynamicallyPartitionedMmTable() throws Exception { + @Test + public void testMultiInsertOnDynamicallyPartitionedMmTable() throws Exception { dropTable(new String[] {"tabMmDp", "tab_not_acid"}); - driver.run("create table if not exists tabMmDp (a int, b int) partitioned by (p string) " + "stored as orc " + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); @@ -1636,14 +1803,8 @@ public void testMultiInsert() throws Exception { rqst.setDbname(dbName); rqst.setTablename(tblName); if (partSpec != null) { - List keyList = new ArrayList(); - List valList = new ArrayList(); - for (String partKey : partSpec.keySet()) { - String partVal = partSpec.remove(partKey); - keyList.add(partKey); - valList.add(partVal); - } - String partName = FileUtils.makePartName(keyList, valList); + String partName = FileUtils.makePartName( + new ArrayList<>(partSpec.keySet()), new ArrayList<>(partSpec.values())); rqst.setPartname(partName); } ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(rqst); @@ -1660,21 +1821,27 @@ public void testShowLocksAgentInfo() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks); Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo()); } + @Test - public void testMerge3Way01() throws Exception { - testMerge3Way(false); + public void testMerge3Way() throws Exception { + testMerge3Way(false, false); } @Test - public void testMerge3Way02() throws Exception { - testMerge3Way(true); + public void testMerge3WayConflict() throws Exception { + testMerge3Way(true, false); + } + @Test + public void testMerge3WayConflictSharedWrite() throws Exception { + testMerge3Way(true, true); } /** - * @param cc whether to cause a WW conflict or not - * @throws Exception + * @param causeConflict whether to cause a WW conflict or not */ - private void testMerge3Way(boolean cc) throws Exception { - dropTable(new String[] {"target","source", "source2"}); + private void testMerge3Way(boolean causeConflict, boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source", "source2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver.run("create table target (a int, b int) " + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1692,7 +1859,6 @@ private void testMerge3Way(boolean cc) throws Exception { //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2 "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"); - driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 @@ -1701,40 +1867,60 @@ private void testMerge3Way(boolean cc) throws Exception { txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); //start concurrent txn DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + - "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 - "when matched and t.a in (" + (cc ? "3,7" : "11, 13") + ") then delete " + //if cc deletes from p=1/q=2, p=2/q=2, else delete nothing + "when matched and t.a=" + (causeConflict ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 + "when matched and t.a in (" + (causeConflict ? "3,7" : "11, 13") + ") then delete " + //if cc deletes from p=1/q=2, p=2/q=2, else delete nothing "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true); //insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 10, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); - long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid(); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source2", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + + long extLockId = checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", null, locks, sharedWrite).getLockid(); + checkLock(LockType.SHARED_READ, (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "source2", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=1/q=2", locks, sharedWrite); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=1/q=3", locks, sharedWrite); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=2/q=2", locks, sharedWrite); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + //complete 1st txn long writeId = txnMgr.getTableWriteId("default", "target"); AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, writeId, "default", "target", @@ -1751,63 +1937,71 @@ private void testMerge3Way(boolean cc) throws Exception { txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + - " and tc_operation_type='u'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='u'")); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 2, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + " and tc_operation_type='d'")); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 3, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + - " and tc_operation_type='i'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 3, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='i'")); + txnMgr.commitTxn(); //commit T1 Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 6, - TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId1)); + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 6, + TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId1)); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId1 + - " and ws_operation_type='u'")); + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId1 + + " and ws_operation_type='u'")); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 2, - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId1 + - " and ws_operation_type='d'")); + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 2, + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId1 + + " and ws_operation_type='d'")); //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2)); + //complete 2nd txn writeId = txnMgr2.getTableWriteId("default", "target"); adp = new AddDynamicPartitions(txnId2, writeId, "default", "target", - Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2")); //update clause + Collections.singletonList(causeConflict ? "p=1/q=3" : "p=1/p=2")); //update clause adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - if(cc) { + + if (causeConflict) { adp = new AddDynamicPartitions(txnId2, writeId, "default", "target", Arrays.asList("p=1/q=2", "p=2/q=2")); //delete clause adp.setOperationType(DataOperationType.DELETE); @@ -1819,22 +2013,22 @@ private void testMerge3Way(boolean cc) throws Exception { txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + - " and tc_operation_type='u'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='u'")); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - (cc ? 2 : 0), - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + - " and tc_operation_type='d'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + (causeConflict ? 2 : 0), + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='d'")); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 3, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + - " and tc_operation_type='i'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 3, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='i'")); LockException expectedException = null; try { @@ -1843,14 +2037,13 @@ private void testMerge3Way(boolean cc) throws Exception { catch (LockException e) { expectedException = e; } - if(cc) { + if (causeConflict) { Assert.assertNotNull("didn't get exception", expectedException); try { Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " + "committed by [txnid:10,11] u/u", expectedException.getMessage()); - } - catch(ComparisonFailure ex) { + } catch (ComparisonFailure ex) { //the 2 txns have 2 conflicts between them so check for either failure since which one is //reported (among the 2) is not deterministic Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + @@ -1859,45 +2052,47 @@ private void testMerge3Way(boolean cc) throws Exception { } Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2)); - } - else { + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2)); + } else { Assert.assertNull("Unexpected exception " + expectedException, expectedException); Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, - TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), + 4, + TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 + - " and ws_operation_type='u'")); + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 + + " and ws_operation_type='u'")); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + - TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 + - " and ws_operation_type='d'")); + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 + + " and ws_operation_type='d'")); } - - + } + + @Test + public void testMergeUnpartitioned() throws Exception { + testMergeUnpartitioned(false, false); } @Test - public void testMergeUnpartitioned01() throws Exception { - testMergeUnpartitioned(true); + public void testMergeUnpartitionedConflict() throws Exception { + testMergeUnpartitioned(true, false); } @Test - public void testMergeUnpartitioned02() throws Exception { - testMergeUnpartitioned(false); + public void testMergeUnpartitionedConflictSharedWrite() throws Exception { + testMergeUnpartitioned(true, true); } /** @@ -1905,36 +2100,39 @@ public void testMergeUnpartitioned02() throws Exception { * Check that proper locks are acquired and Write conflict detection works and the state * of internal table. * @param causeConflict true to make 2 operations such that they update the same entity - * @throws Exception */ - private void testMergeUnpartitioned(boolean causeConflict) throws Exception { + private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) throws Exception { dropTable(new String[] {"target","source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver.run("create table target (a int, b int) " + "clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')"); driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"); driver.run("create table source (a int, b int)"); - if(causeConflict) { + + if (causeConflict) { driver.compileAndRespond("update target set b = 2 where a=1", true); - } - else { + } else { driver.compileAndRespond("insert into target values(9,10),(11,12)", true); } long txnid1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, //no DP, so it's populated from lock info - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, //no DP, so it's populated from lock info + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); List locks = getLocks(txnMgr); if (causeConflict) { Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", null, locks); } else { Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); } DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); @@ -1949,38 +2147,40 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); - checkLock(LockType.SHARED_READ, causeConflict ? LockState.WAITING : LockState.ACQUIRED, - "default", "source", null, locks); - long extLockId = checkLock(LockType.SHARED_WRITE, causeConflict ? LockState.WAITING : LockState.ACQUIRED, - "default", "target", null, locks).getLockid(); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, (causeConflict && !sharedWrite) ? LockState.WAITING : LockState.ACQUIRED, + "default", "source", null, locks); + long extLockId = checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (causeConflict && !sharedWrite) ? LockState.WAITING : LockState.ACQUIRED, + "default", "target", null, locks, sharedWrite).getLockid(); txnMgr.commitTxn(); //commit T1 Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), causeConflict ? 1 : 0, //Inserts are not tracked by WRITE_SET - TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnid1 + - " and ws_operation_type=" + (causeConflict ? "'u'" : "'i'"))); - + TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnid1 + + " and ws_operation_type=" + (causeConflict ? "'u'" : "'i'"))); //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", null, locks); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, // - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, // + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, // - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + - "and tc_operation_type='d'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, // + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + + " and tc_operation_type='d'")); //complete T2 txn LockException expectedException = null; @@ -1990,14 +2190,13 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { catch (LockException e) { expectedException = e; } - if(causeConflict) { - Assert.assertTrue("Didn't get exception", expectedException != null); + if (causeConflict) { + Assert.assertNotNull("Didn't get exception", expectedException); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", "Aborting [txnid:7,7] due to a write conflict on default/target committed by [txnid:6,7] d/u", expectedException.getCause().getMessage()); - } - else { + } else { Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), 1, //Unpartitioned table: 1 row for Delete; Inserts are not tracked in WRITE_SET @@ -2005,9 +2204,9 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { " and ws_operation_type='d'")); } } + /** * Check that DP with partial spec properly updates TXN_COMPONENTS - * @throws Exception */ @Test public void testDynamicPartitionInsert() throws Exception { @@ -2024,15 +2223,15 @@ public void testDynamicPartitionInsert() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); Assert.assertEquals( "HIVE_LOCKS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + - TxnDbUtil.queryToString(conf, "select * from HIVE_LOCKS"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from HIVE_LOCKS where hl_txnid=" + txnid1)); + TxnDbUtil.queryToString(conf, "select * from HIVE_LOCKS"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from HIVE_LOCKS where hl_txnid=" + txnid1)); txnMgr.rollbackTxn(); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); //now actually write to table to generate some partitions driver.run("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"); driver.run("select count(*) from target"); @@ -2046,7 +2245,6 @@ public void testDynamicPartitionInsert() throws Exception { //txnid+1 because we want txn used by previous driver.run("insert....) TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1))); - long txnid2 = txnMgr.openTxn(ctx, "T1"); driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); @@ -2055,32 +2253,40 @@ public void testDynamicPartitionInsert() throws Exception { //Plan is using DummyPartition, so can only lock the table... unfortunately checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); long writeId = txnMgr.getTableWriteId("default", "target"); - AddDynamicPartitions adp = new AddDynamicPartitions(txnid2, writeId, "default", "target", Arrays.asList("p=1/q=2","p=1/q=2")); + AddDynamicPartitions adp = new AddDynamicPartitions(txnid2, writeId, "default", "target", + Arrays.asList("p=1/q=2","p=1/q=2")); adp.setOperationType(DataOperationType.INSERT); txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 2, //2 distinct partitions modified - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 2, //2 distinct partitions modified + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); txnMgr.commitTxn(); } + + @Test + public void testMergePartitioned() throws Exception { + testMergePartitioned(false, false); + } @Test - public void testMergePartitioned01() throws Exception { - testMergePartitioned(false); + public void testMergePartitionedConflict() throws Exception { + testMergePartitioned(true, false); } @Test - public void testMergePartitioned02() throws Exception { - testMergePartitioned(true); + public void testMergePartitionedConflictSharedWrite() throws Exception { + testMergePartitioned(true, true); } + /** * "run" an Update and Merge concurrently; Check that correct locks are acquired. * Check state of auxiliary ACID tables. * @param causeConflict - true to make the operations cause a Write conflict - * @throws Exception */ - private void testMergePartitioned(boolean causeConflict) throws Exception { + private void testMergePartitioned(boolean causeConflict, boolean sharedWrite) throws Exception { dropTable(new String[] {"target","source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver.run("create table target (a int, b int) " + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -2092,8 +2298,11 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); @@ -2106,27 +2315,39 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 7, locks.size()); - /** + /* * W locks from T1 are still there, so all locks from T2 block. * The Update part of Merge requests W locks for each existing partition in target. * The Insert part doesn't know which partitions may be written to: thus R lock on target table. - * */ - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks); - long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid(); - - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); - - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks); + */ + checkLock(LockType.SHARED_READ, (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "source", null, locks); + long extLockId = checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", null, locks).getLockid(); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=1/q=2", locks, sharedWrite); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=1/q=3", locks, sharedWrite); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + (sharedWrite ? LockState.ACQUIRED : LockState.WAITING), + "default", "target", "p=2/q=2", locks); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, //because it's using a DP write - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 0, //because it's using a DP write + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + //complete T1 transaction (simulate writing to 2 partitions) long writeId = txnMgr.getTableWriteId("default", "target"); AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, writeId, "default", "target", @@ -2135,10 +2356,11 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 2, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + - " and tc_operation_type='u'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='u'")); + txnMgr.commitTxn(); //commit T1 Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), @@ -2146,47 +2368,52 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId1 + " and ws_operation_type='u'")); - //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE), + LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, //because it's using a DP write - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 0, //because it's using a DP write + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + //complete T2 txn //simulate Insert into 2 partitions writeId = txnMgr2.getTableWriteId("default", "target"); adp = new AddDynamicPartitions(txnid2, writeId, "default", "target", - Arrays.asList("p=1/q=2","p=1/q=3")); + Arrays.asList("p=1/q=2","p=1/q=3")); adp.setOperationType(DataOperationType.INSERT); txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 2, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='i'")); + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + + " and tc_operation_type='i'")); //simulate Update of 1 partitions; depending on causeConflict, choose one of the partitions //which was modified by the T1 update stmt or choose a non-conflicting one adp = new AddDynamicPartitions(txnid2, writeId, "default", "target", - Collections.singletonList(causeConflict ? "p=1/q=2" : "p=1/q=1")); + Collections.singletonList(causeConflict ? "p=1/q=2" : "p=1/q=1")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + - TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 1, - TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='u'")); - + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + + " and tc_operation_type='u'")); LockException expectedException = null; try { @@ -2195,14 +2422,13 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { catch (LockException e) { expectedException = e; } - if(causeConflict) { - Assert.assertTrue("Didn't get exception", expectedException != null); + if (causeConflict) { + Assert.assertNotNull("Didn't get exception", expectedException); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", "Aborting [txnid:7,7] due to a write conflict on default/target/p=1/q=2 committed by [txnid:6,7] u/u", expectedException.getCause().getMessage()); - } - else { + } else { Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), 1, //1 partitions updated @@ -2218,14 +2444,13 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { /** * This test is mostly obsolete. The logic in the Driver.java no longer acquires any locks for * "show tables". Keeping the test for now in case we change that logic. - * @throws Exception */ @Test public void testShowTablesLock() throws Exception { dropTable(new String[] {"T", "T2"}); driver.run("create table T (a int, b int)"); - long txnid1 = txnMgr.openTxn(ctx, "Fifer"); + txnMgr.openTxn(ctx, "Fifer"); driver.compileAndRespond("insert into T values(1,3)", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -2246,7 +2471,6 @@ public void testShowTablesLock() throws Exception { Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); - swapTxnManager(txnMgr); driver.run( "create table T2 (a int, b int) partitioned by (p int) clustered by (a) " + @@ -2272,6 +2496,7 @@ public void testShowTablesLock() throws Exception { Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); } + @Test public void testFairness() throws Exception { dropTable(new String[] {"T6"}); @@ -2282,7 +2507,7 @@ public void testFairness() throws Exception { swapTxnManager(txnMgr2); driver.compileAndRespond("drop table if exists T6", true); //tries to get X lock on T6 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); @@ -2323,8 +2548,7 @@ public void testFairness2() throws Exception { swapTxnManager(txnMgr2); driver.compileAndRespond("alter table T7 drop partition (p=1)", true); //tries to get X lock on T7.p=1 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, - "Fiddler", false); + ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 4, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); @@ -2350,7 +2574,7 @@ public void testFairness2() throws Exception { txnMgr.commitTxn(); //release locks from "select a from T7" - to unblock hte drop partition //retest the the "drop partiton" X lock - lockState = ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid()); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 4, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", locks); @@ -2360,20 +2584,19 @@ public void testFairness2() throws Exception { txnMgr2.rollbackTxn(); //release the X lock on T7.p=1 //re-test the locks - lockState = ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); - } @Test public void testValidWriteIdListSnapshot() throws Exception { - // Create a transactional table dropTable(new String[] {"temp.T7"}); driver.run("create database if not exists temp"); + // Create a transactional table driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc " + "TBLPROPERTIES ('transactional'='true')"); @@ -2447,8 +2670,10 @@ public void testValidWriteIdListSnapshot() throws Exception { driver.run("drop database if exists temp cascade"); } + @Rule public TemporaryFolder exportFolder = new TemporaryFolder(); + /** * see also {@link org.apache.hadoop.hive.ql.TestTxnAddPartition} */ @@ -2471,9 +2696,19 @@ public void testAddPartitionLocks() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test public void testLoadData() throws Exception { + testLoadData(false); + } + @Test + public void testLoadDataSharedWrite() throws Exception { + testLoadData(true); + } + + private void testLoadData(boolean sharedWrite) throws Exception { dropTable(new String[] {"T2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); driver.run("create table T2(a int) stored as ORC TBLPROPERTIES ('transactional'='true')"); driver.run("insert into T2 values(1)"); String exportLoc = exportFolder.newFolder("1").toString(); @@ -2482,23 +2717,24 @@ public void testLoadData() throws Exception { txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T2", null, locks); + checkLock((sharedWrite ? LockType.EXCL_WRITE : LockType.EXCLUSIVE), + LockState.ACQUIRED, "default", "T2", null, locks); txnMgr.commitTxn(); } + @Test public void testMmConversionLocks() throws Exception { dropTable(new String[] {"T"}); driver.run("create table T (a int, b int) tblproperties('transactional'='false')"); driver.run("insert into T values(0,2),(1,4)"); - - driver.compileAndRespond("ALTER TABLE T set tblproperties" - + "('transactional'='true', 'transactional_properties'='insert_only')", true); + driver.compileAndRespond("ALTER TABLE T set tblproperties" + + "('transactional'='true', 'transactional_properties'='insert_only')", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets X lock on T - List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test public void testTruncate() throws Exception { dropTable(new String[] {"T"}); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java index 22902a9c20..6e5c4328cc 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockType; import java.net.InetAddress; import java.net.UnknownHostException; @@ -42,6 +41,7 @@ public LockRequestBuilder() { this(null); } + public LockRequestBuilder(String agentInfo) { req = new LockRequest(); trie = new LockTrie(); @@ -116,6 +116,7 @@ public LockRequestBuilder addLockComponents(Collection components // valid key in a LinkedHashMap. So a database lock will map to (dbname, null, // null). private static class LockTrie { + LockTypeComparator lockTypeComparator = new LockTypeComparator(); Map trie; LockTrie() { @@ -161,14 +162,8 @@ private void setPart(LockComponent comp, PartTrie parts) { if (existing == null) { // No existing lock for this partition. parts.put(comp.getPartitionname(), comp); - } else if (existing.getType() != LockType.EXCLUSIVE && - (comp.getType() == LockType.EXCLUSIVE || - comp.getType() == LockType.SHARED_WRITE)) { - // We only need to promote if comp.type is > existing.type. For - // efficiency we check if existing is exclusive (in which case we - // need never promote) or if comp is exclusive or shared_write (in - // which case we can promote even though they may both be shared - // write). If comp is shared_read there's never a need to promote. + } else if (lockTypeComparator.compare(comp.getType(), existing.getType()) > 0) { + // We only need to promote if comp.type is > existing.type. parts.put(comp.getPartitionname(), comp); } } @@ -179,7 +174,5 @@ private void setPart(LockComponent comp, PartTrie parts) { private static class PartTrie extends LinkedHashMap { } - - } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockTypeComparator.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockTypeComparator.java new file mode 100644 index 0000000000..c6586497ee --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockTypeComparator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.metastore.api.LockType; + +import java.util.Comparator; +import java.util.Arrays; +import java.util.List; + + +/** + * Sort more restrictive locks after less restrictive ones. + */ +public class LockTypeComparator implements Comparator { + private static final String UNKNOWN_LOCK_TYPE = "Unknown LockType: "; + + // the higher the integer value, the more restrictive the lock type is + private List orderOfRestrictiveness = Arrays.asList( + LockType.SHARED_READ, LockType.SHARED_WRITE, LockType.EXCL_WRITE, LockType.EXCLUSIVE); + + @Override + public boolean equals(Object other) { + return this == other; + } + + @Override + public int compare(LockType t1, LockType t2) { + int t1Id = orderOfRestrictiveness.indexOf(t1); + int t2Id = orderOfRestrictiveness.indexOf(t2); + + if (t1Id == -1) { + throw new RuntimeException(UNKNOWN_LOCK_TYPE + t1); + } + if (t2Id == -1) { + throw new RuntimeException(UNKNOWN_LOCK_TYPE + t2); + } + return Integer.compare(t1Id, t2Id); + } + +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 331fd4cc8d..b294eef00b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -33,7 +33,6 @@ import java.util.BitSet; import java.util.Collections; import java.util.Comparator; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,6 +68,7 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; +import org.apache.hadoop.hive.metastore.LockTypeComparator; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -4065,7 +4065,8 @@ private boolean isPartitionLock() { } private static class LockInfoComparator implements Comparator { - private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); + private LockTypeComparator lockTypeComparator = new LockTypeComparator(); + public boolean equals(Object other) { return this == other; } @@ -4101,43 +4102,6 @@ public int compare(LockInfo info1, LockInfo info2) { } } - /** - * Sort more restrictive locks after less restrictive ones. Why? - * Consider insert overwirte table DB.T1 select ... from T2: - * this takes X lock on DB.T1 and S lock on T2 - * Also, create table DB.T3: takes S lock on DB. - * so the state of the lock manger is {X(T1), S(T2) S(DB)} all in acquired state. - * This is made possible by HIVE-10242. - * Now a select * from T1 will try to get S(T1) which according to the 'jumpTable' will - * be acquired once it sees S(DB). So need to check stricter locks first. - */ - private final static class LockTypeComparator implements Comparator { - // the higher the integer value, the more restrictive the lock type is - private static final Map orderOfRestrictiveness = new EnumMap<>(LockType.class); - static { - orderOfRestrictiveness.put(LockType.SHARED_READ, 1); - orderOfRestrictiveness.put(LockType.SHARED_WRITE, 2); - orderOfRestrictiveness.put(LockType.EXCL_WRITE, 3); - orderOfRestrictiveness.put(LockType.EXCLUSIVE, 4); - } - @Override - public boolean equals(Object other) { - return this == other; - } - @Override - public int compare(LockType t1, LockType t2) { - Integer t1Restrictiveness = orderOfRestrictiveness.get(t1); - Integer t2Restrictiveness = orderOfRestrictiveness.get(t2); - if (t1Restrictiveness == null) { - throw new RuntimeException("Unexpected LockType: " + t1); - } - if (t2Restrictiveness == null) { - throw new RuntimeException("Unexpected LockType: " + t2); - } - return Integer.compare(t1Restrictiveness, t2Restrictiveness); - } - } - private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING} // A jump table to figure out whether to wait, acquire, @@ -4358,13 +4322,18 @@ is performed on that db (e.g. show tables, created table, etc). EXCLUSIVE on an object may mean it's being dropped or overwritten.*/ String[] whereStr = { // exclusive - " \"REQ\".\"HL_LOCK_TYPE\"='e'" + - " AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"='r' AND \"REQ\".\"HL_TABLE\" IS NOT NULL)", + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclusive() + + " AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"=" + + LockTypeUtil.shared_read() + " AND \"REQ\".\"HL_TABLE\" IS NOT NULL)", // shared-write - " \"REQ\".\"HL_LOCK_TYPE\"='w' AND \"EX\".\"HL_LOCK_TYPE\" IN ('w','e')", + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.shared_write() + " AND \"EX\".\"HL_LOCK_TYPE\" IN (" + + LockTypeUtil.excl_write() + "," + LockTypeUtil.exclusive() + ")", + // excl-write + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.excl_write() + " AND \"EX\".\"HL_LOCK_TYPE\"!=" + + LockTypeUtil.shared_read(), // shared-read - " \"REQ\".\"HL_LOCK_TYPE\"='r' AND \"EX\".\"HL_LOCK_TYPE\"='e'" + - " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)", + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.shared_read() + " AND \"EX\".\"HL_LOCK_TYPE\"=" + + LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)" }; List subQuery = new ArrayList<>(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LockTypeUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LockTypeUtil.java index f928bf781b..edafdc5fba 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LockTypeUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/LockTypeUtil.java @@ -30,7 +30,25 @@ public static String getEncodingAsStr(LockType lockType) { return Character.toString(getEncoding(lockType)); } + private static String getEncodingAsQuotedStr(LockType lockType) { + return "'" + getEncodingAsStr(lockType) + "'"; + } + public static Optional getLockTypeFromEncoding(char encoding) { return Optional.ofNullable(persistenceEncodings.inverse().get(encoding)); } + + public static String exclusive(){ + return getEncodingAsQuotedStr(LockType.EXCLUSIVE); + } + public static String excl_write(){ + return getEncodingAsQuotedStr(LockType.EXCL_WRITE); + } + public static String shared_write(){ + return getEncodingAsQuotedStr(LockType.SHARED_WRITE); + } + public static String shared_read(){ + return getEncodingAsQuotedStr(LockType.SHARED_READ); + } + } -- 2.23.0