diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9f59d4cea3..5e92980841 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -181,6 +181,10 @@ public void setOperation(Operation operation) { this.operation = operation; } + public Operation getOperation() { + return operation; + } + public WmContext getWmContext() { return wmContext; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index c1f94d165b..750abcb6a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -372,7 +375,12 @@ private JSONObject getLocks(PrintStream out, ExplainWork work) { if (jsonOutput) { out = null; } - List lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf); + Operation operation = Optional.of(work).map(ExplainWork::getParseContext) + .map(ParseContext::getContext).map(Context::getOperation) + .orElse(Operation.OTHER); + + List lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), + operation, conf); if (null != out) { out.print("LOCK INFORMATION:\n"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 998c05e37d..30a0132d3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -2921,11 +2922,13 @@ private static boolean isLockableTable(Table t) { * @return list with lock components */ public static List makeLockComponents(Set outputs, Set inputs, - HiveConf conf) { + Context.Operation operation, HiveConf conf) { + List lockComponents = new ArrayList<>(); boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS); boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS); boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK); + boolean isMerge = operation == Context.Operation.MERGE; // For each source to read, get a shared_read lock for (ReadEntity input : inputs) { @@ -3040,9 +3043,17 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi assert t != null; if (AcidUtils.isTransactionalTable(t)) { if (sharedWrite) { - compBuilder.setSharedWrite(); + if (!isMerge) { + compBuilder.setSharedWrite(); + } else { + compBuilder.setExclWrite(); + } } else { - compBuilder.setSharedRead(); + if (!isMerge) { + compBuilder.setSharedRead(); + } else { + compBuilder.setExclusive(); + } } } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) { final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index deaab89c1f..71afcbdc68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -421,7 +421,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB LOG.debug("No locks needed for queryId=" + queryId); return null; } - List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); + List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), + ctx.getOperation(), conf); lockComponents.addAll(getGlobalLocks(ctx.getConf())); //It's possible there's nothing to lock even if we have w/r entities. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java index 3ffdcec528..cea7b11324 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -59,6 +59,7 @@ public void analyze(ASTNode tree) throws SemanticException { throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "MergeSemanticAnalyzer"); } + ctx.setOperation(Context.Operation.MERGE); analyzeMerge(tree); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index b435e79c3c..a5c783101e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -54,11 +54,11 @@ public static void setUpDB() throws Exception{ @Before public void setUp() throws Exception { - conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); TxnDbUtil.cleanDb(conf); SessionState ss = SessionState.get(); ss.initTxnMgr(conf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 1687425bcb..d5e1421634 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2189,6 +2189,94 @@ private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) } } + @Test + public void testInsertMergeInsertLocking() throws Exception { + testMergeInsertLocking(false); + } + @Test + public void testInsertMergeInsertLockingSharedWrite() throws Exception { + testMergeInsertLocking(true); + } + + private void testMergeInsertLocking(boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond("insert into target values (5, 6)"); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + + driver.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)"); + txnMgr2.acquireLocks(driver.getPlan(), driver.getContext(), "T2", false); + List locks = getLocks(); + + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks); + checkLock((sharedWrite ? LockType.EXCL_WRITE : LockType.EXCLUSIVE), + LockState.WAITING, "default", "target", null, locks); + } + + @Test + public void test2MergeInsertsConcurrentNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", false); + } + @Test + public void test2MergeInsertsConcurrentSharedWriteNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", true); + } + @Test + public void testtInsertMergeInsertConcurrentNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", false); + } + @Test + public void testtInsertMergeInsertConcurrentSharedWriteNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", true); + } + + private void testConcurrentMergeInsertNoDuplicates(String query, boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond(query); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + + driver2.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)"); + + swapTxnManager(txnMgr); + driver.run(); + + //merge should notice snapshot changes and re-create it + swapTxnManager(txnMgr2); + driver2.run(); + + swapTxnManager(txnMgr); + driver.run("select * from target"); + List res = new ArrayList(); + driver.getFetchTask().fetch(res); + Assert.assertEquals("Duplicate records found", 4, res.size()); + } + /** * Check that DP with partial spec properly updates TXN_COMPONENTS */ diff --git a/ql/src/test/results/clientpositive/llap/explain_locks.q.out b/ql/src/test/results/clientpositive/llap/explain_locks.q.out index d62f6ccafd..fb795dfd80 100644 --- a/ql/src/test/results/clientpositive/llap/explain_locks.q.out +++ b/ql/src/test/results/clientpositive/llap/explain_locks.q.out @@ -150,7 +150,7 @@ default.target.p=1/q=3 -> EXCL_WRITE default.target.p=1/q=3 -> EXCL_WRITE default.target.p=1/q=2 -> EXCL_WRITE default.target.p=1/q=2 -> EXCL_WRITE -default.target -> SHARED_READ +default.target -> EXCLUSIVE PREHOOK: query: explain locks update target set b = 1 where p in (select t.q1 from source t where t.a1=5) PREHOOK: type: QUERY PREHOOK: Input: default@source @@ -248,4 +248,4 @@ default.target.p=1/q=3 -> SHARED_WRITE default.target.p=1/q=3 -> SHARED_WRITE default.target.p=1/q=2 -> SHARED_WRITE default.target.p=1/q=2 -> SHARED_WRITE -default.target -> SHARED_WRITE +default.target -> EXCL_WRITE