diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java index 0e6d903..fb1a309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java @@ -42,7 +42,6 @@ private transient ASTNode rootNode; private transient boolean isValidASTStr; private transient boolean visited = false; - transient String matchedText; public ASTNode() { } @@ -347,12 +346,4 @@ private String toStringTree(ASTNode rootNode) { return rootNode.getMemoizedSubString(startIndx, endIndx); } - - /** - * The string that generated this node. - * Only set for a node if parser grammar sets it for a particular rule - */ - public String getMatchedText() { - return matchedText; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g index f8adb38..a423e6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g @@ -142,7 +142,7 @@ tableAlias fromSource @init { gParent.pushMsg("from source", state); } -@after { $fromSource.tree.matchedText = $fromSource.text; gParent.popMsg(state); } +@after { gParent.popMsg(state); } : (LPAREN KW_VALUES) => fromSource0 | (LPAREN) => LPAREN joinSource RPAREN -> joinSource @@ -278,7 +278,7 @@ searchCondition // INSERT INTO (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c) valueRowConstructor @init { gParent.pushMsg("value row constructor", state); } -@after { $valueRowConstructor.tree.matchedText = $valueRowConstructor.text; gParent.popMsg(state); } +@after { gParent.popMsg(state); } : LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+) ; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 1868dda..d50d839 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -382,7 +382,7 @@ intervalQualifiers expression @init { gParent.pushMsg("expression specification", state); } -@after { $expression.tree.matchedText = $expression.text; gParent.popMsg(state); } +@after { gParent.popMsg(state); } : precedenceOrExpression ; @@ -459,7 +459,6 @@ precedencePlusOperator ; precedencePlusExpression -@after { $precedencePlusExpression.tree.matchedText = $precedencePlusExpression.text; } : precedenceStarExpression (precedencePlusOperator^ precedenceStarExpression)* ; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 027eb68..05e1f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -21,20 +21,20 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.antlr.runtime.TokenRewriteStream; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -555,12 +555,59 @@ private static String normalizeColName(String colName) { return colName.toLowerCase(); } - //todo: see SubQueryDiagnostic for some ideas on turning ASTNode into SQL - //todo: should we add MERGE to AcidUtils.Operation instead? that will be a lot of code clean up private enum Operation {UPDATE, DELETE, MERGE, NOT_ACID}; private Operation currentOperation = Operation.NOT_ACID; private static final String Indent = " "; + private IdentifierQuoter quotedIdenfierHelper; + + /** + * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it. + * Since HiveLexer.g is written such that it strips away any ` (back ticks) around + * quoted identifiers we need to add those back to generated SQL. + * Additionally, the parser only produces tokens of type Identifier and never + * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers. + * (') around String literals are retained w/o issues + */ + private static class IdentifierQuoter { + private final TokenRewriteStream trs; + private final IdentityHashMap visitedNodes = new IdentityHashMap<>(); + IdentifierQuoter(TokenRewriteStream trs) { + this.trs = trs; + if(trs == null) { + throw new IllegalArgumentException("Must have a TokenRewriteStream"); + } + } + private void visit(ASTNode n) { + if(n.getType() == HiveParser.Identifier) { + if(visitedNodes.containsKey(n)) { + /** + * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take + * care to only quote Identifiers in each subtree once, but this makes it safe + */ + return; + } + visitedNodes.put(n, n); + trs.insertBefore(n.getToken(), "`"); + trs.insertAfter(n.getToken(), "`"); + } + if(n.getChildCount() <= 0) {return;} + for(Node c : n.getChildren()) { + visit((ASTNode)c); + } + } + } + + /** + * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without + * needing to understand what it is (except for QuotedIdentifiers) + * + */ + private String getMatchedText(ASTNode n) { + quotedIdenfierHelper.visit(n); + return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(), + n.getTokenStopIndex() + 1).trim(); + } /** * Here we take a Merge statement AST and generate a semantically equivalent multi-insert * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible, @@ -575,6 +622,7 @@ private static String normalizeColName(String colName) { */ private void analyzeMerge(ASTNode tree) throws SemanticException { currentOperation = Operation.MERGE; + quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); /* * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST For example, given: @@ -605,7 +653,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) todo: do we care to preserve comments in original SQL? todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse... - todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens hwen source is empty? This should be a runtime error - maybe not + todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0 rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error */ ASTNode target = (ASTNode)tree.getChild(0); @@ -613,6 +661,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) String targetName = getSimpleTableName(target); String sourceName = getSimpleTableName(source); ASTNode onClause = (ASTNode) tree.getChild(2); + String onClauseAsText = getMatchedText(onClause); Table targetTable = getTargetTable(target); validateTargetTable(targetTable); @@ -627,7 +676,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n"); if(source.getType() == HiveParser.TOK_SUBQUERY) { //this includes the mandatory alias - rewrittenQueryStr.append(Indent).append(source.getMatchedText()); + rewrittenQueryStr.append(Indent).append(getMatchedText(source)); } else { rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source)); @@ -636,7 +685,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) } } rewrittenQueryStr.append('\n'); - rewrittenQueryStr.append(Indent).append("ON ").append(onClause.getMatchedText()).append('\n'); + rewrittenQueryStr.append(Indent).append("ON ").append(onClauseAsText).append('\n'); /** * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete @@ -649,18 +698,18 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) for(ASTNode whenClause : whenClauses) { switch (getWhenClauseOperation(whenClause).getType()) { case HiveParser.TOK_INSERT: - handleInsert(whenClause, rewrittenQueryStr, target, onClause, targetTable, targetName); + handleInsert(whenClause, rewrittenQueryStr, target, onClause, targetTable, targetName, onClauseAsText); break; case HiveParser.TOK_UPDATE: numWhenMatchedUpdateClauses++; - String s = handleUpdate(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate); + String s = handleUpdate(whenClause, rewrittenQueryStr, target, onClauseAsText, targetTable, extraPredicate); if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s;//i.e. it's the 1st WHEN MATCHED } break; case HiveParser.TOK_DELETE: numWhenMatchedDeleteClauses++; - String s1 = handleDelete(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate); + String s1 = handleDelete(whenClause, rewrittenQueryStr, target, onClauseAsText, targetTable, extraPredicate); if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED } @@ -793,7 +842,7 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri String targetName = getSimpleTableName(target); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); - rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID"); + rewrittenQueryStr.append(" -- update clause\n select ").append(targetName).append(".ROW__ID"); ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0); //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions @@ -807,10 +856,16 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri rewrittenQueryStr.append(", "); String name = fs.getName(); if (setColsExprs.containsKey(name)) { - rewrittenQueryStr.append(setColsExprs.get(name).getMatchedText()); + String rhsExp = getMatchedText(setColsExprs.get(name)); + //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream + switch (rhsExp.charAt(rhsExp.length() - 1)) { + case ',': + case '\n': + rhsExp = rhsExp.substring(0, rhsExp.length() - 1); + } + rewrittenQueryStr.append(rhsExp); } else { - //todo: is this the right way to get
. for target? rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf)); } } @@ -847,7 +902,7 @@ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewri rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(partCols, rewrittenQueryStr); - rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID "); + rewrittenQueryStr.append(" -- delete clause\n select ").append(targetName).append(".ROW__ID "); addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName); rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause); @@ -966,7 +1021,7 @@ private String getWhenClausePredicate(ASTNode whenClause) { throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause); } if(whenClause.getChildCount() == 2) { - return ((ASTNode)whenClause.getChild(1)).getMatchedText(); + return getMatchedText((ASTNode)whenClause.getChild(1)); } return null; } @@ -977,26 +1032,26 @@ private String getWhenClausePredicate(ASTNode whenClause) { */ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target, ASTNode onClause, Table targetTable, - String targetTableNameInSourceQuery) throws SemanticException{ + String targetTableNameInSourceQuery, String onClauseAsString) throws SemanticException { assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED; assert getWhenClauseOperation(whenNotMatchedClause).getType() == HiveParser.TOK_INSERT; List partCols = targetTable.getPartCols(); + String valuesClause = getMatchedText((ASTNode)getWhenClauseOperation(whenNotMatchedClause).getChild(0)); + valuesClause = valuesClause.substring(1, valuesClause.length() - 1);//strip '(' and ')' - String valuesClause = ((ASTNode)getWhenClauseOperation(whenNotMatchedClause).getChild(0)) - .getMatchedText(); - valuesClause = valuesClause.substring(1, valuesClause.length() - 1); rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); addPartitionColsToInsert(partCols, rewrittenQueryStr); - OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery); + OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery, + conf, onClauseAsString); oca.analyze(); - rewrittenQueryStr.append("\n select ") + rewrittenQueryStr.append(" -- insert clause\n select ") .append(valuesClause).append("\n WHERE ").append(oca.getPredicate()); String extraPredicate = getWhenClausePredicate(whenNotMatchedClause); if(extraPredicate != null) { //we have WHEN NOT MATCHED AND THEN INSERT rewrittenQueryStr.append(" AND ") - .append(((ASTNode)whenNotMatchedClause.getChild(1)).getMatchedText()).append('\n'); + .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n'); } } /** @@ -1010,7 +1065,7 @@ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQ * we know that target is always a table (as opposed to some derived table). * The job of this class is to generate this predicate. * - * Note that is thi predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates + * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown, * and so it will be False for WHEN NOT MATCHED Insert... */ @@ -1021,14 +1076,19 @@ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQ private final List allTargetTableColumns = new ArrayList<>(); private final Set tableNamesFound = new HashSet<>(); private final String targetTableNameInSourceQuery; + private final HiveConf conf; + private final String onClauseAsString; /** * @param targetTableNameInSourceQuery alias or simple name */ - OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery) { + OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, + HiveConf conf, String onClauseAsString) { this.onClause = onClause; allTargetTableColumns.addAll(targetTable.getCols()); allTargetTableColumns.addAll(targetTable.getPartCols()); this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery); + this.conf = conf; + this.onClauseAsString = onClauseAsString; } /** * finds all columns and groups by table ref (if there is one) @@ -1040,7 +1100,7 @@ private void visit(ASTNode n) { //the ref must be a table, so look for column name as right child of DOT if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) { //I don't think this can happen... but just in case - throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClause.getMatchedText()); + throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString); } addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText()); } @@ -1058,15 +1118,14 @@ private void visit(ASTNode n) { } private void analyze() { visit(onClause); - int numTableRefs = tableNamesFound.size(); if(tableNamesFound.size() > 2) { throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " + - tableNamesFound + " in " + onClause.getMatchedText()); + tableNamesFound + " in " + onClauseAsString); } handleUnresolvedColumns(); if(tableNamesFound.size() > 2) { throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " + - "Found " + tableNamesFound + " in " + onClause.getMatchedText()); + "Found " + tableNamesFound + " in " + onClauseAsString); } } /** @@ -1109,7 +1168,7 @@ private String getPredicate() { sb.append(" AND "); } //but preserve table name in SQL - sb.append(targetTableNameInSourceQuery).append(".").append(col).append(" IS NULL"); + sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf)).append(".").append(HiveUtils.unparseIdentifier(col, conf)).append(" IS NULL"); } return sb.toString(); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index fd9959e..1626e2e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -602,9 +602,15 @@ public void testMergeNegative2() throws Exception { "\nWHEN MATCHED THEN UPDATE set b=a"); Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)cpr.getException()).getCanonicalErrorMsg()); } - @Ignore + + /** + * `1` means 1 is a column name and '1' means 1 is a string literal + * HiveConf.HIVE_QUOTEDID_SUPPORT + * HiveConf.HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES + * {@link TestTxnCommands#testMergeType2SCD01()} + */ @Test - public void testSpecialChar() throws Exception { + public void testQuotedIdentifier() throws Exception { String target = "`aci/d_u/ami`"; String src = "`src/name`"; runStatementOnDriver("drop table if exists " + target); @@ -612,11 +618,152 @@ public void testSpecialChar() throws Exception { runStatementOnDriver("create table " + target + "(i int," + "`d?*de e` decimal(5,2)," + "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))"); - runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h` " + + runStatementOnDriver("create table " + src + "(gh int, j decimal(5,2), k varchar(128))"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + + "\nwhen matched and i > 5 then delete " + + "\nwhen matched then update set vc='blah' " + + "\nwhen not matched then insert values(1,2.1,'baz')"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + "\nwhen matched and i > 5 then delete " + - "\nwhen matched then update set vc=`∆∋` " + - "\nwhen not matched then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)"); + "\nwhen matched then update set vc='blah', `d?*de e` = current_timestamp() " + + "\nwhen not matched then insert values(1,2.1, concat('baz', current_timestamp()))"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + + "\nwhen matched and i > 5 then delete " + + "\nwhen matched then update set vc='blah' " + + "\nwhen not matched then insert values(1,2.1,'a\\b')"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " + + "\nwhen matched and i > 5 then delete " + + "\nwhen matched then update set vc='∆∋'" + + "\nwhen not matched then insert values(`a/b`.gh,`a/b`.j,'c\\t')"); + } + @Test + public void testQuotedIdentifier2() throws Exception { + String target = "`aci/d_u/ami`"; + String src = "`src/name`"; + runStatementOnDriver("drop table if exists " + target); + runStatementOnDriver("drop table if exists " + src); + runStatementOnDriver("create table " + target + "(i int," + + "`d?*de e` decimal(5,2)," + + "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" + + "\nwhen matched and `g/h` > 5 then delete " + + "\nwhen matched and `g/h` < 0 then update set vc='∆∋', `d?*de e` = `d?*de e` * j + 1" + + "\nwhen not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)"); + runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" + + "\nwhen matched and `g/h` > 5 then delete" + + "\n when matched and `g/h` < 0 then update set vc='∆∋' , `d?*de e` = `d?*de e` * j + 1 " + + "\n when not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)"); + } + /** + * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy + * also test QuotedIdentifier inside source expression + * {@link TestTxnCommands#testQuotedIdentifier()} + * {@link TestTxnCommands#testQuotedIdentifier2()} + */ + @Test + public void testMergeType2SCD01() throws Exception { + runStatementOnDriver("drop table if exists target"); + runStatementOnDriver("drop table if exists source"); + runStatementOnDriver("drop table if exists splitTable"); + + runStatementOnDriver("create table splitTable(op int)"); + runStatementOnDriver("insert into splitTable values (0),(1)"); + runStatementOnDriver("create table source (key int, data int)"); + runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; + runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); + int[][] sourceVals = {{1, 7}, {3, 8}}; + runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); + //augment source with a col which has 1 if it will cause an update in target, 0 otherwise + String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key"; + //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update + String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end `o/p\\n` from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1"; + if(false) { + //this is just for debug + List r1 = runStatementOnDriver(curMatch); + List r2 = runStatementOnDriver(teeCurMatch); + } + String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " + + "when matched then update set cur=0 " + + "when not matched then insert values(s.key,s.data,1)"; + + runStatementOnDriver(stmt); + int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; + List r = runStatementOnDriver("select * from target order by key,data,cur"); + Assert.assertEquals(stringifyValues(resultVals), r); + } + /** + * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy + * Same as testMergeType2SCD01 but with a more intuitive "source" expression + */ + @Test + public void testMergeType2SCD02() throws Exception { + runStatementOnDriver("drop table if exists target"); + runStatementOnDriver("drop table if exists source"); + runStatementOnDriver("create table source (key int, data int)"); + runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; + runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); + int[][] sourceVals = {{1, 7}, {3, 8}}; + runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); + + String baseSrc = "select source.*, 0 c from source " + + "union all " + + "select source.*, 1 c from source " + + "inner join target " + + "on source.key=target.key where target.cur=1"; + if(false) { + //this is just for debug + List r1 = runStatementOnDriver(baseSrc); + List r2 = runStatementOnDriver( + "select t.*, s.* from target t right outer join (" + baseSrc + ") s " + + "\non t.key=s.key and t.cur=s.c and t.cur=1"); + } + String stmt = "merge into target t using " + + "(" + baseSrc + ") s " + + "on t.key=s.key and t.cur=s.c and t.cur=1 " + + "when matched then update set cur=0 " + + "when not matched then insert values(s.key,s.data,1)"; + + runStatementOnDriver(stmt); + int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; + List r = runStatementOnDriver("select * from target order by key,data,cur"); + Assert.assertEquals(stringifyValues(resultVals), r); + } + @Test + public void testMergeUpdateDelete() throws Exception { + int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd)); + int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); + String query = "merge into " + Table.ACIDTBL + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + runStatementOnDriver(query); + + List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), r); + } + @Test + public void testMergeDeleteUpdate() throws Exception { + int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + String query = "merge into " + Table.ACIDTBL + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED and s.a < 5 THEN DELETE " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + runStatementOnDriver(query); + + List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] rExpected = {{5,6},{7,8},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), r); } @Test public void testSetClauseFakeColumn() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 9145cf3..5932d7e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1623,114 +1623,6 @@ public void testMergeAliasedTarget() throws Exception { int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } - @Test - public void testMergeUpdateDelete() throws Exception { - int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}}; - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd)); - int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); - String query = "merge into " + Table.ACIDTBL + - " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + - "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + - "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " + - "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; - runStatementOnDriver(query); - - List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}}; - Assert.assertEquals(stringifyValues(rExpected), r); - } - @Test - public void testMergeDeleteUpdate() throws Exception { - int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}}; - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); - int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); - String query = "merge into " + Table.ACIDTBL + - " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + - "WHEN MATCHED and s.a < 5 THEN DELETE " + - "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + - "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; - runStatementOnDriver(query); - - List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{5,6},{7,8},{11,11}}; - Assert.assertEquals(stringifyValues(rExpected), r); - } - - /** - * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy - */ - @Test - public void testMergeType2SCD01() throws Exception { - runStatementOnDriver("drop table if exists target"); - runStatementOnDriver("drop table if exists source"); - runStatementOnDriver("drop table if exists splitTable"); - - runStatementOnDriver("create table splitTable(op int)"); - runStatementOnDriver("insert into splitTable values (0),(1)"); - runStatementOnDriver("create table source (key int, data int)"); - runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; - runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); - int[][] sourceVals = {{1, 7}, {3, 8}}; - runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); - //augment source with a col which has 1 if it will cause an update in target, 0 otherwise - String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key"; - //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update - String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end op from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1"; - if(false) { - //this is just for debug - List r1 = runStatementOnDriver(curMatch); - List r2 = runStatementOnDriver(teeCurMatch); - } - String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.op=1 " + - "when matched then update set cur=0 " + - "when not matched then insert values(s.key,s.data,1)"; - - runStatementOnDriver(stmt); - int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; - List r = runStatementOnDriver("select * from target order by key,data,cur"); - Assert.assertEquals(stringifyValues(resultVals), r); - } - /** - * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy - * Same as testMergeType2SCD01 but with a more intuitive "source" expression - */ - @Test - public void testMergeType2SCD02() throws Exception { - runStatementOnDriver("drop table if exists target"); - runStatementOnDriver("drop table if exists source"); - runStatementOnDriver("create table source (key int, data int)"); - runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}}; - runStatementOnDriver("insert into target " + makeValuesClause(targetVals)); - int[][] sourceVals = {{1, 7}, {3, 8}}; - runStatementOnDriver("insert into source " + makeValuesClause(sourceVals)); - - String baseSrc = "select source.*, 0 c from source " + - "union all " + - "select source.*, 1 c from source " + - "inner join target " + - "on source.key=target.key where target.cur=1"; - if(false) { - //this is just for debug - List r1 = runStatementOnDriver(baseSrc); - List r2 = runStatementOnDriver( - "select t.*, s.* from target t right outer join (" + baseSrc + ") s " + - "\non t.key=s.key and t.cur=s.c and t.cur=1"); - } - String stmt = "merge into target t using " + - "(" + baseSrc + ") s " + - "on t.key=s.key and t.cur=s.c and t.cur=1 " + - "when matched then update set cur=0 " + - "when not matched then insert values(s.key,s.data,1)"; - - runStatementOnDriver(stmt); - int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; - List r = runStatementOnDriver("select * from target order by key,data,cur"); - Assert.assertEquals(stringifyValues(resultVals), r); - } @Test @Ignore("Values clause with table constructor not yet supported") diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java index c4dead8..67e917c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java @@ -540,10 +540,4 @@ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } - @Test - @Ignore - public void testMergeType2SCD01() throws Exception {} - @Test - @Ignore - public void testMergeType2SCD02() throws Exception {} }