diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9f59d4cea3..64ff3232d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -165,6 +165,7 @@ private WmContext wmContext; private boolean isExplainPlan = false; + private boolean isMerge = false; private PlanMapper planMapper = new PlanMapper(); private StatsSource statsSource; private int executionIndex; @@ -1194,6 +1195,14 @@ public void setExplainPlan(boolean t) { this.isExplainPlan = t; } + public boolean isMerge() { + return isMerge; + } + + public void setMerge(boolean isMerge) { + this.isMerge = isMerge; + } + public void setExplainConfig(ExplainConfiguration explainConfig) { this.explainConfig = explainConfig; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index c1f94d165b..06b987b2f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -372,7 +374,8 @@ private JSONObject getLocks(PrintStream out, ExplainWork work) { if (jsonOutput) { out = null; } - List lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf); + List lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), + Optional.ofNullable(work.getParseContext()).map(ParseContext::getContext).orElse(null), conf); if (null != out) { out.print("LOCK INFORMATION:\n"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 998c05e37d..a60a7967be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -2921,11 +2922,12 @@ private static boolean isLockableTable(Table t) { * @return list with lock components */ public static List makeLockComponents(Set outputs, Set inputs, - HiveConf conf) { + Context ctx, HiveConf conf) { List lockComponents = new ArrayList<>(); boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS); boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS); boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK); + boolean isMerge = (ctx != null) && ctx.isMerge(); // For each source to read, get a shared_read lock for (ReadEntity input : inputs) { @@ -3040,9 +3042,17 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi assert t != null; if (AcidUtils.isTransactionalTable(t)) { if (sharedWrite) { - compBuilder.setSharedWrite(); + if (!isMerge) { + compBuilder.setSharedWrite(); + } else { + compBuilder.setExclWrite(); + } } else { - compBuilder.setSharedRead(); + if (!isMerge) { + compBuilder.setSharedRead(); + } else { + compBuilder.setExclusive(); + } } } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) { final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index deaab89c1f..26b7f476c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -421,7 +421,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB LOG.debug("No locks needed for queryId=" + queryId); return null; } - List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); + List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), ctx, conf); lockComponents.addAll(getGlobalLocks(ctx.getConf())); //It's possible there's nothing to lock even if we have w/r entities. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java index 3ffdcec528..7edbf269c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -59,6 +59,7 @@ public void analyze(ASTNode tree) throws SemanticException { throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + "MergeSemanticAnalyzer"); } + ctx.setMerge(true); analyzeMerge(tree); } @@ -272,6 +273,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) Context rewrittenCtx = rr.rewrittenCtx; ASTNode rewrittenTree = rr.rewrittenTree; rewrittenCtx.setOperation(Context.Operation.MERGE); + rewrittenCtx.setMerge(true); //set dest name mapping on new context; 1st child is TOK_FROM for (int insClauseIdx = 1, whenClauseIdx = 0; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index b435e79c3c..a5c783101e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -54,11 +54,11 @@ public static void setUpDB() throws Exception{ @Before public void setUp() throws Exception { - conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); TxnDbUtil.cleanDb(conf); SessionState ss = SessionState.get(); ss.initTxnMgr(conf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 1687425bcb..d5e1421634 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2189,6 +2189,94 @@ private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) } } + @Test + public void testInsertMergeInsertLocking() throws Exception { + testMergeInsertLocking(false); + } + @Test + public void testInsertMergeInsertLockingSharedWrite() throws Exception { + testMergeInsertLocking(true); + } + + private void testMergeInsertLocking(boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond("insert into target values (5, 6)"); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + + driver.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)"); + txnMgr2.acquireLocks(driver.getPlan(), driver.getContext(), "T2", false); + List locks = getLocks(); + + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ), + LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks); + checkLock((sharedWrite ? LockType.EXCL_WRITE : LockType.EXCLUSIVE), + LockState.WAITING, "default", "target", null, locks); + } + + @Test + public void test2MergeInsertsConcurrentNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", false); + } + @Test + public void test2MergeInsertsConcurrentSharedWriteNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", true); + } + @Test + public void testtInsertMergeInsertConcurrentNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", false); + } + @Test + public void testtInsertMergeInsertConcurrentSharedWriteNoDuplicates() throws Exception { + testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", true); + } + + private void testConcurrentMergeInsertNoDuplicates(String query, boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond(query); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + + driver2.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)"); + + swapTxnManager(txnMgr); + driver.run(); + + //merge should notice snapshot changes and re-create it + swapTxnManager(txnMgr2); + driver2.run(); + + swapTxnManager(txnMgr); + driver.run("select * from target"); + List res = new ArrayList(); + driver.getFetchTask().fetch(res); + Assert.assertEquals("Duplicate records found", 4, res.size()); + } + /** * Check that DP with partial spec properly updates TXN_COMPONENTS */ diff --git a/ql/src/test/results/clientpositive/llap/explain_locks.q.out b/ql/src/test/results/clientpositive/llap/explain_locks.q.out index d62f6ccafd..fb795dfd80 100644 --- a/ql/src/test/results/clientpositive/llap/explain_locks.q.out +++ b/ql/src/test/results/clientpositive/llap/explain_locks.q.out @@ -150,7 +150,7 @@ default.target.p=1/q=3 -> EXCL_WRITE default.target.p=1/q=3 -> EXCL_WRITE default.target.p=1/q=2 -> EXCL_WRITE default.target.p=1/q=2 -> EXCL_WRITE -default.target -> SHARED_READ +default.target -> EXCLUSIVE PREHOOK: query: explain locks update target set b = 1 where p in (select t.q1 from source t where t.a1=5) PREHOOK: type: QUERY PREHOOK: Input: default@source @@ -248,4 +248,4 @@ default.target.p=1/q=3 -> SHARED_WRITE default.target.p=1/q=3 -> SHARED_WRITE default.target.p=1/q=2 -> SHARED_WRITE default.target.p=1/q=2 -> SHARED_WRITE -default.target -> SHARED_WRITE +default.target -> EXCL_WRITE