diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b3a475478d..7e4dc0c1fe 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2745,6 +2745,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + "table there is at most 1 matching row in the source table per SQL Specification."), + MERGE_SPLIT_UPDATE("hive.merge.split.update", false, + "If true, SQL Merge statement will handle WHEN MATCHED UPDATE by splitting it into 2\n" + + "branches of a multi-insert, representing delete of existing row and an insert of\n" + + "the new version of the row. Updating bucketing and partitioning columns should\n" + + "only be permitted if this is true."), OPTIMIZE_ACID_META_COLUMNS("hive.optimize.acid.meta.columns", true, "If true, don't decode Acid metadata columns from storage unless" + " they are needed."), diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 18089d59fd..b11d5a167c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -200,7 +200,13 @@ private String getMatchedText(ASTNode n) { return getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim(); } /** - * The suffix is always relative to a given ASTNode + * The suffix is always relative to a given ASTNode. + * We need this so that FileSinkOperatorS corresponding to different branches of a multi-insert + * statement which represents a SQL Merge statement get marked correctly with + * {@link org.apache.hadoop.hive.ql.io.AcidUtils.Operation}. See usages + * of {@link #getDestNamePrefix(ASTNode, QB)} and + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#updating(String)} and + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#deleting(String)}. */ public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) { assert curNode != null : "must supply curNode"; @@ -255,7 +261,7 @@ public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) { case DELETE: return DestClausePrefix.DELETE; case MERGE: - /* This is the structrue expected here + /* This is the structure expected here HiveParser.TOK_QUERY; HiveParser.TOK_FROM HiveParser.TOK_INSERT; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java index 44f7b43be6..db6d5519e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -64,7 +64,7 @@ public void analyze(ASTNode tree) throws SemanticException { private static final String INDENT = " "; - private IdentifierQuoter quotedIdenfierHelper; + private IdentifierQuoter quotedIdentifierHelper; /** * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it. @@ -112,7 +112,7 @@ private void visit(ASTNode n) { * needing to understand what it is (except for QuotedIdentifiers). */ private String getMatchedText(ASTNode n) { - quotedIdenfierHelper.visit(n); + quotedIdentifierHelper.visit(n); return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim(); } @@ -130,7 +130,7 @@ private String getMatchedText(ASTNode n) { * @throws SemanticException */ private void analyzeMerge(ASTNode tree) throws SemanticException { - quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); + quotedIdentifierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); /* * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST For example, given: @@ -161,7 +161,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select todo: do we care to preserve comments in original SQL? - todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent + todo: check if identifiers are properly escaped/quoted in the generated SQL - it's currently inconsistent Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse... todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0 @@ -211,7 +211,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) if (hasHint) { hintStr = " /*+ " + qHint.getText() + " */ "; } - + final boolean splitUpdateEarly = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE); /** * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete * If we have both update and delete, the 1st one (in SQL code) must have "AND " @@ -233,7 +234,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) case HiveParser.TOK_UPDATE: numWhenMatchedUpdateClauses++; String s = handleUpdate(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, + splitUpdateEarly); hintProcessed = true; if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s; //i.e. it's the 1st WHEN MATCHED @@ -242,7 +244,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) case HiveParser.TOK_DELETE: numWhenMatchedDeleteClauses++; String s1 = handleDelete(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, false); hintProcessed = true; if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED @@ -271,7 +273,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) ASTNode rewrittenTree = rr.rewrittenTree; rewrittenCtx.setOperation(Context.Operation.MERGE); - //set dest name mapping on new context; 1st chid is TOK_FROM + //set dest name mapping on new context; 1st child is TOK_FROM for (int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/); insClauseIdx++, whenClauseIdx++) { @@ -281,7 +283,21 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); break; case HiveParser.TOK_UPDATE: - rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); + if(!splitUpdateEarly) { + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); + } else { + /* With 2 branches for the update, the 1st branch is the INSERT part + and the next one is the DELETE. WriteSet tracking treats 2 concurrent DELETES + as in conflict so Lost Update is still prevented since the delete event lands in the + partition/bucket where the original version of the row was so any concurrent update/delete + of the same row will land in the same partition/bucket. + + If the insert part lands in a different partition, it should not conflict with another + Update of that partition since that update by definition cannot be of the same row. + If we ever enforce unique constraints we may have to have I+I be in conflict*/ + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); + rewrittenCtx.addDestNamePrefix(++insClauseIdx, Context.DestClausePrefix.DELETE); + } break; case HiveParser.TOK_DELETE: rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE); @@ -339,7 +355,7 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a // given session, i.e. the name can be fixed across all invocations String tableName = "merge_tmp_table"; - rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + rewrittenQueryStr.append("INSERT INTO ").append(tableName) .append("\n SELECT cardinality_violation(") .append(getSimpleTableName(target)).append(".ROW__ID"); addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); @@ -381,22 +397,25 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN * @param deleteExtraPredicate - see notes at caller */ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr) + String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr, boolean splitUpdateEarly) throws SemanticException { assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED; assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE; String targetName = getSimpleTableName(target); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); - rewrittenQueryStr.append(" -- update clause\n SELECT "); + rewrittenQueryStr.append(" -- update clause").append(splitUpdateEarly ? "(insert part)": "") + .append("\n SELECT "); if (hintStr != null) { rewrittenQueryStr.append(hintStr); } - rewrittenQueryStr.append(targetName).append(".ROW__ID"); + if(!splitUpdateEarly) { + rewrittenQueryStr.append(targetName).append(".ROW__ID, "); + } ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0); //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions - //before reparsing, i.e. they are known to SemanticAnalyzer logic + //before re-parsing, i.e. they are known to SemanticAnalyzer logic Map setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable); //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end //up with @@ -404,8 +423,11 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table //names List nonPartCols = targetTable.getCols(); - for (FieldSchema fs : nonPartCols) { - rewrittenQueryStr.append(", "); + for(int i = 0; i < nonPartCols.size(); i++) { + FieldSchema fs = nonPartCols.get(i); + if(i > 0) { + rewrittenQueryStr.append(", "); + } String name = fs.getName(); if (setColsExprs.containsKey(name)) { String rhsExp = getMatchedText(setColsExprs.get(name)); @@ -435,14 +457,26 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri if (deleteExtraPredicate != null) { rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")"); } - rewrittenQueryStr.append("\n SORT BY "); - rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); + if(!splitUpdateEarly) { + rewrittenQueryStr.append("\n SORT BY "); + rewrittenQueryStr.append(targetName).append(".ROW__ID "); + } + rewrittenQueryStr.append("\n"); setUpAccessControlInfoForUpdate(targetTable, setColsExprs); //we don't deal with columns on RHS of SET expression since the whole expr is part of the - //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to + //rewritten SQL statement and is thus handled by SemanticAnalyzer. Nor do we have to //figure which cols on RHS are from source and which from target + if(splitUpdateEarly) { + /** + * this is part of the WHEN MATCHED UPDATE, so we ignore any 'extra predicate' generated + * by this call to handleDelete() + */ + handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, target, onClauseAsString, + targetTable, deleteExtraPredicate, hintStr, true); + } + return extraPredicate; } @@ -450,17 +484,23 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri * @param onClauseAsString - because there is no clone() and we need to use in multiple places * @param updateExtraPredicate - see notes at caller */ - private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr) - throws SemanticException { + private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, + ASTNode target, String onClauseAsString, Table targetTable, String updateExtraPredicate, + String hintStr, boolean splitUpdateEarly) throws SemanticException { assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED; - assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; + assert (splitUpdateEarly && + getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) || + getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; List partCols = targetTable.getPartCols(); String targetName = getSimpleTableName(target); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(partCols, rewrittenQueryStr); - rewrittenQueryStr.append(" -- delete clause\n SELECT "); + if(splitUpdateEarly) { + rewrittenQueryStr.append(" -- update clause (delete part)\n SELECT "); + } else { + rewrittenQueryStr.append(" -- delete clause\n SELECT "); + } if (hintStr != null) { rewrittenQueryStr.append(hintStr); } @@ -590,8 +630,9 @@ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQ if (extraPredicate != null) { //we have WHEN NOT MATCHED AND THEN INSERT rewrittenQueryStr.append(" AND ") - .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n'); + .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))); } + rewrittenQueryStr.append('\n'); } private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode) diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java index 6caac119e7..33247f0745 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java @@ -221,7 +221,7 @@ protected void markReadEntityForUpdate() { * For updates, we need to set the column access info so that it contains information on * the columns we are updating. * (But not all the columns of the target table even though the rewritten query writes - * all columns of target table since that is an implmentation detail). + * all columns of target table since that is an implementation detail). */ protected void setUpAccessControlInfoForUpdate(Table mTable, Map setCols) { ColumnAccessInfo cai = new ColumnAccessInfo(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index adce54cf27..8f6d1e479a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7971,8 +7971,16 @@ private WriteEntity generateTableWriteEntity(String dest, Table dest_tab, if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest)); if (!outputs.add(output)) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName())); + if(!((this instanceof MergeSemanticAnalyzer) && + conf.getBoolVar(ConfVars.MERGE_SPLIT_UPDATE))) { + /** + * Merge stmt with early split update may create several (2) writes to the same + * table with the same {@link WriteType}, e.g. if original Merge stmt has both update and + * delete clauses, and update is split into insert + delete, in which case it's not an + * error*/ + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(dest_tab.getTableName())); + } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c2b931a26b..33c25ed1ba 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -970,13 +970,29 @@ public void testMergeOnTezEdges() throws Exception { sb.append(s).append('\n'); } LOG.info("Explain1: " + sb); + /* + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 2 (SIMPLE_EDGE) + Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 6 <- Reducer 2 (SIMPLE_EDGE) + Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + */ for(int i = 0; i < explain.size(); i++) { if(explain.get(i).contains("Edges:")) { - Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), + explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), + explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), + explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), + explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), + explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), + explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); break; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6f44e9bfc5..0734ed959c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -147,6 +147,7 @@ void setUpWithTableProperties(String tableProperties) throws Exception { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); //TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version //of these tests. diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index dc2963df84..5f39fdccb5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -100,6 +100,7 @@ void setUpInternal() throws Exception { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 5e085f84af..43a3047f87 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; +import org.junit.ComparisonFailure; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -1529,7 +1530,7 @@ public void testWriteSetTracking10() throws Exception { 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } /** - * Concurrent delete/detele of same partition - should pass + * Concurrent delete/delete of same partition - should NOT pass */ @Test public void testWriteSetTracking11() throws Exception { @@ -1584,18 +1585,27 @@ public void testWriteSetTracking11() throws Exception { Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//"select * from tab1" txn - - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); - Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); + LockException expectedException = null; + try { + txnMgr.commitTxn();//"select * from tab1" txn + } + catch(LockException ex) { + expectedException = ex; + } + Assert.assertNotNull("Didn't get expected d/d conflict", expectedException); + Assert.assertEquals("Transaction manager has aborted the transaction txnid:5. " + + "Reason: Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two " + + "committed by [txnid:4,5] d/d", expectedException.getMessage()); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); + Assert.assertEquals("WRITE_SET mismatch: " + + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), + 0, TxnDbUtil.countQueryAgent(conf, + "select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } @Test public void testCompletedTxnComponents() throws Exception { @@ -1706,8 +1716,8 @@ private void testMerge3Way(boolean cc) throws Exception { "(9,10,1,2), (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)")); checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)")); checkCmdOnDriver(driver.run("insert into source2 values " + - //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2 - "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); + //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2 + "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + @@ -1729,7 +1739,7 @@ private void testMerge3Way(boolean cc) throws Exception { swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 - "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 + "when matched and t.a in (" + (cc ? "3,7" : "11, 13") + ") then delete " + //if cc deletes from p=1/q=2, p=2/q=2, else delete nothing "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); @@ -1824,10 +1834,12 @@ private void testMerge3Way(boolean cc) throws Exception { Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - adp = new AddDynamicPartitions(txnId2, writeId, "default", "target", - Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause - adp.setOperationType(DataOperationType.DELETE); - txnHandler.addDynamicPartitions(adp); + if(cc) { + adp = new AddDynamicPartitions(txnId2, writeId, "default", "target", + Arrays.asList("p=1/q=2", "p=2/q=2"));//delete clause + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); + } adp = new AddDynamicPartitions(txnId2, writeId, "default", "target", Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause adp.setOperationType(DataOperationType.INSERT); @@ -1841,7 +1853,7 @@ private void testMerge3Way(boolean cc) throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 2, + (cc ? 2 : 0), TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + " and tc_operation_type='d'")); Assert.assertEquals( @@ -1860,9 +1872,18 @@ private void testMerge3Way(boolean cc) throws Exception { } if(cc) { Assert.assertNotNull("didn't get exception", expectedException); - Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + - "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " + - "committed by [txnid:10,11] u/u", expectedException.getMessage()); + try { + Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " + + "committed by [txnid:10,11] u/u", expectedException.getMessage()); + } + catch(ComparisonFailure ex) { + //the 2 txns have 2 conflicts between them so check for either failure since which one is + //reported (among the 2) is not deterministic + Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=2 " + + "committed by [txnid:10,11] d/d", expectedException.getMessage()); + } Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), @@ -1879,7 +1900,7 @@ private void testMerge3Way(boolean cc) throws Exception { Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from COMPLETED_TXN_COMPONENTS"), - 6, + 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + @@ -1890,7 +1911,7 @@ private void testMerge3Way(boolean cc) throws Exception { Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from WRITE_SET"), - 2, + 0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET where ws_txnid=" + txnId2 + " and ws_operation_type='d'")); } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7520922e37..6df7680a7c 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1174,9 +1174,17 @@ public void commitTxn(CommitTxnRequest rqst) " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as // part of this commitTxn() op " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns - //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all - " and (committed.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + - " OR cur.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + ")")); + //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all + //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns + //where each does "delete X + insert X, where X is a row with the same PK. This is + //equivalent to an update of X but won't be in conflict unless D+D is in conflict. + //The same happens when Hive splits U=I+D early so it looks like 2 branches of a + //multi-insert stmt (an Insert and a Delete branch). It also 'feels' + // un-serializable to allow concurrent deletes + " and (committed.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + + ", " + quoteChar(OperationType.DELETE.sqlConst) + + ") AND cur.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + + quoteChar(OperationType.DELETE.sqlConst) + "))")); if (rs.next()) { //found a conflict String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";