diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b3a475478d..7e4dc0c1fe 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2745,6 +2745,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + "table there is at most 1 matching row in the source table per SQL Specification."), + MERGE_SPLIT_UPDATE("hive.merge.split.update", false, + "If true, SQL Merge statement will handle WHEN MATCHED UPDATE by splitting it into 2\n" + + "branches of a multi-insert, representing delete of existing row and an insert of\n" + + "the new version of the row. Updating bucketing and partitioning columns should\n" + + "only be permitted if this is true."), OPTIMIZE_ACID_META_COLUMNS("hive.optimize.acid.meta.columns", true, "If true, don't decode Acid metadata columns from storage unless" + " they are needed."), diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 18089d59fd..b11d5a167c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -200,7 +200,13 @@ private String getMatchedText(ASTNode n) { return getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim(); } /** - * The suffix is always relative to a given ASTNode + * The suffix is always relative to a given ASTNode. + * We need this so that FileSinkOperatorS corresponding to different branches of a multi-insert + * statement which represents a SQL Merge statement get marked correctly with + * {@link org.apache.hadoop.hive.ql.io.AcidUtils.Operation}. See usages + * of {@link #getDestNamePrefix(ASTNode, QB)} and + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#updating(String)} and + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#deleting(String)}. */ public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) { assert curNode != null : "must supply curNode"; @@ -255,7 +261,7 @@ public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) { case DELETE: return DestClausePrefix.DELETE; case MERGE: - /* This is the structrue expected here + /* This is the structure expected here HiveParser.TOK_QUERY; HiveParser.TOK_FROM HiveParser.TOK_INSERT; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java index 44f7b43be6..db6d5519e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -64,7 +64,7 @@ public void analyze(ASTNode tree) throws SemanticException { private static final String INDENT = " "; - private IdentifierQuoter quotedIdenfierHelper; + private IdentifierQuoter quotedIdentifierHelper; /** * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it. @@ -112,7 +112,7 @@ private void visit(ASTNode n) { * needing to understand what it is (except for QuotedIdentifiers). */ private String getMatchedText(ASTNode n) { - quotedIdenfierHelper.visit(n); + quotedIdentifierHelper.visit(n); return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(), n.getTokenStopIndex() + 1).trim(); } @@ -130,7 +130,7 @@ private String getMatchedText(ASTNode n) { * @throws SemanticException */ private void analyzeMerge(ASTNode tree) throws SemanticException { - quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); + quotedIdentifierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); /* * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST For example, given: @@ -161,7 +161,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select todo: do we care to preserve comments in original SQL? - todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent + todo: check if identifiers are properly escaped/quoted in the generated SQL - it's currently inconsistent Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse... todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0 @@ -211,7 +211,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) if (hasHint) { hintStr = " /*+ " + qHint.getText() + " */ "; } - + final boolean splitUpdateEarly = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE); /** * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete * If we have both update and delete, the 1st one (in SQL code) must have "AND " @@ -233,7 +234,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) case HiveParser.TOK_UPDATE: numWhenMatchedUpdateClauses++; String s = handleUpdate(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, + splitUpdateEarly); hintProcessed = true; if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s; //i.e. it's the 1st WHEN MATCHED @@ -242,7 +244,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) case HiveParser.TOK_DELETE: numWhenMatchedDeleteClauses++; String s1 = handleDelete(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, false); hintProcessed = true; if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED @@ -271,7 +273,7 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) ASTNode rewrittenTree = rr.rewrittenTree; rewrittenCtx.setOperation(Context.Operation.MERGE); - //set dest name mapping on new context; 1st chid is TOK_FROM + //set dest name mapping on new context; 1st child is TOK_FROM for (int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/); insClauseIdx++, whenClauseIdx++) { @@ -281,7 +283,21 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); break; case HiveParser.TOK_UPDATE: - rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); + if(!splitUpdateEarly) { + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); + } else { + /* With 2 branches for the update, the 1st branch is the INSERT part + and the next one is the DELETE. WriteSet tracking treats 2 concurrent DELETES + as in conflict so Lost Update is still prevented since the delete event lands in the + partition/bucket where the original version of the row was so any concurrent update/delete + of the same row will land in the same partition/bucket. + + If the insert part lands in a different partition, it should not conflict with another + Update of that partition since that update by definition cannot be of the same row. + If we ever enforce unique constraints we may have to have I+I be in conflict*/ + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); + rewrittenCtx.addDestNamePrefix(++insClauseIdx, Context.DestClausePrefix.DELETE); + } break; case HiveParser.TOK_DELETE: rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE); @@ -339,7 +355,7 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a // given session, i.e. the name can be fixed across all invocations String tableName = "merge_tmp_table"; - rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + rewrittenQueryStr.append("INSERT INTO ").append(tableName) .append("\n SELECT cardinality_violation(") .append(getSimpleTableName(target)).append(".ROW__ID"); addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); @@ -381,22 +397,25 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN * @param deleteExtraPredicate - see notes at caller */ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr) + String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr, boolean splitUpdateEarly) throws SemanticException { assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED; assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE; String targetName = getSimpleTableName(target); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); - rewrittenQueryStr.append(" -- update clause\n SELECT "); + rewrittenQueryStr.append(" -- update clause").append(splitUpdateEarly ? "(insert part)": "") + .append("\n SELECT "); if (hintStr != null) { rewrittenQueryStr.append(hintStr); } - rewrittenQueryStr.append(targetName).append(".ROW__ID"); + if(!splitUpdateEarly) { + rewrittenQueryStr.append(targetName).append(".ROW__ID, "); + } ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0); //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions - //before reparsing, i.e. they are known to SemanticAnalyzer logic + //before re-parsing, i.e. they are known to SemanticAnalyzer logic Map setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable); //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end //up with @@ -404,8 +423,11 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table //names List nonPartCols = targetTable.getCols(); - for (FieldSchema fs : nonPartCols) { - rewrittenQueryStr.append(", "); + for(int i = 0; i < nonPartCols.size(); i++) { + FieldSchema fs = nonPartCols.get(i); + if(i > 0) { + rewrittenQueryStr.append(", "); + } String name = fs.getName(); if (setColsExprs.containsKey(name)) { String rhsExp = getMatchedText(setColsExprs.get(name)); @@ -435,14 +457,26 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri if (deleteExtraPredicate != null) { rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")"); } - rewrittenQueryStr.append("\n SORT BY "); - rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); + if(!splitUpdateEarly) { + rewrittenQueryStr.append("\n SORT BY "); + rewrittenQueryStr.append(targetName).append(".ROW__ID "); + } + rewrittenQueryStr.append("\n"); setUpAccessControlInfoForUpdate(targetTable, setColsExprs); //we don't deal with columns on RHS of SET expression since the whole expr is part of the - //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to + //rewritten SQL statement and is thus handled by SemanticAnalyzer. Nor do we have to //figure which cols on RHS are from source and which from target + if(splitUpdateEarly) { + /** + * this is part of the WHEN MATCHED UPDATE, so we ignore any 'extra predicate' generated + * by this call to handleDelete() + */ + handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, target, onClauseAsString, + targetTable, deleteExtraPredicate, hintStr, true); + } + return extraPredicate; } @@ -450,17 +484,23 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri * @param onClauseAsString - because there is no clone() and we need to use in multiple places * @param updateExtraPredicate - see notes at caller */ - private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr) - throws SemanticException { + private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, + ASTNode target, String onClauseAsString, Table targetTable, String updateExtraPredicate, + String hintStr, boolean splitUpdateEarly) throws SemanticException { assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED; - assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; + assert (splitUpdateEarly && + getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) || + getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; List partCols = targetTable.getPartCols(); String targetName = getSimpleTableName(target); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(partCols, rewrittenQueryStr); - rewrittenQueryStr.append(" -- delete clause\n SELECT "); + if(splitUpdateEarly) { + rewrittenQueryStr.append(" -- update clause (delete part)\n SELECT "); + } else { + rewrittenQueryStr.append(" -- delete clause\n SELECT "); + } if (hintStr != null) { rewrittenQueryStr.append(hintStr); } @@ -590,8 +630,9 @@ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQ if (extraPredicate != null) { //we have WHEN NOT MATCHED AND THEN INSERT rewrittenQueryStr.append(" AND ") - .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n'); + .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))); } + rewrittenQueryStr.append('\n'); } private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode) diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java index 6caac119e7..33247f0745 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java @@ -221,7 +221,7 @@ protected void markReadEntityForUpdate() { * For updates, we need to set the column access info so that it contains information on * the columns we are updating. * (But not all the columns of the target table even though the rewritten query writes - * all columns of target table since that is an implmentation detail). + * all columns of target table since that is an implementation detail). */ protected void setUpAccessControlInfoForUpdate(Table mTable, Map setCols) { ColumnAccessInfo cai = new ColumnAccessInfo(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index adce54cf27..8f6d1e479a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7971,8 +7971,16 @@ private WriteEntity generateTableWriteEntity(String dest, Table dest_tab, if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest)); if (!outputs.add(output)) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName())); + if(!((this instanceof MergeSemanticAnalyzer) && + conf.getBoolVar(ConfVars.MERGE_SPLIT_UPDATE))) { + /** + * Merge stmt with early split update may create several (2) writes to the same + * table with the same {@link WriteType}, e.g. if original Merge stmt has both update and + * delete clauses, and update is split into insert + delete, in which case it's not an + * error*/ + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(dest_tab.getTableName())); + } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c2b931a26b..33c25ed1ba 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -970,13 +970,29 @@ public void testMergeOnTezEdges() throws Exception { sb.append(s).append('\n'); } LOG.info("Explain1: " + sb); + /* + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 2 (SIMPLE_EDGE) + Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 6 <- Reducer 2 (SIMPLE_EDGE) + Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + */ for(int i = 0; i < explain.size(); i++) { if(explain.get(i).contains("Edges:")) { - Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), + explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), + explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), + explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), + explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), + explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), + explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); break; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6f44e9bfc5..0734ed959c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -147,6 +147,7 @@ void setUpWithTableProperties(String tableProperties) throws Exception { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); //TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version //of these tests. diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index dc2963df84..5f39fdccb5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -100,6 +100,7 @@ void setUpInternal() throws Exception { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a17abfc089..a4721d3914 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1174,9 +1174,17 @@ public void commitTxn(CommitTxnRequest rqst) " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as // part of this commitTxn() op " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns - //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all - " and (committed.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + - " OR cur.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + ")")); + //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all + //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns + //where each does "delete X + insert X, where X is a row with the same PK. This is + //equivalent to an update of X but won't be in conflict unless D+D is in conflict. + //The same happens when Hive splits U=I+D early so it looks like 2 branches of a + //multi-insert stmt (an Insert and a Delete branch). It also 'feels' + // un-serializable to allow concurrent deletes + " and (committed.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + + ", " + quoteChar(OperationType.DELETE.sqlConst) + + ") AND cur.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + + quoteChar(OperationType.DELETE.sqlConst) + "))")); if (rs.next()) { //found a conflict String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";