diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1dbae40..06fa9c5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1943,6 +1943,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), + + MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, + "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + + "table there is at most 1 matching row in the source table."), // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", diff --git pom.xml pom.xml index b4c0b81..9afe781 100644 --- pom.xml +++ pom.xml @@ -100,7 +100,7 @@ 2.4 2.4 2.4.3 - 2.19.1 + 2.18.1 2.4 2.8 2.9 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 4fce1ac..86613f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.udf.UDFAtan; import org.apache.hadoop.hive.ql.udf.UDFBase64; import org.apache.hadoop.hive.ql.udf.UDFBin; +import org.apache.hadoop.hive.ql.udf.UDFCardinalityViolation; import org.apache.hadoop.hive.ql.udf.UDFChr; import org.apache.hadoop.hive.ql.udf.UDFConv; import org.apache.hadoop.hive.ql.udf.UDFCos; @@ -464,6 +465,7 @@ system.registerGenericUDF("printf", GenericUDFPrintf.class); system.registerGenericUDF("greatest", GenericUDFGreatest.class); system.registerGenericUDF("least", GenericUDFLeast.class); + system.registerUDF("cardinality_violation", UDFCardinalityViolation.class, false); system.registerGenericUDF("from_utc_timestamp", GenericUDFFromUtcTimestamp.class); system.registerGenericUDF("to_utc_timestamp", GenericUDFToUtcTimestamp.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e5d0101..228eb07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -856,7 +856,7 @@ private static void writeAsText(String text, FSDataOutputStream out) throws IOEx } /** - * Generate a temp table out of a value clause + * Generate a temp table out of a values clause * See also {@link #preProcessForInsert(ASTNode, QB)} */ private ASTNode genValuesTempTable(ASTNode originalFrom, QB qb) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 79355ba..d4d8652 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -29,9 +29,12 @@ import java.util.Set; import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -690,13 +693,15 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); } - + handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText); ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); Context rewrittenCtx = rr.rewrittenCtx; ASTNode rewrittenTree = rr.rewrittenTree; //set dest name mapping on new context - for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) { + for(int insClauseIdx = 1, whenClauseIdx = 0; + insClauseIdx < rewrittenTree.getChildCount() - 1/*skip cardinality violation clause*/; + insClauseIdx++, whenClauseIdx++) { //we've added Insert clauses in order or WHEN items in whenClauses ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx); switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) { @@ -810,6 +815,50 @@ private boolean isTargetTable(Entity entity, Table targetTable) { */ return targetTable.equals(entity.getTable()); } + + /** + * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, + * an error should be raised if > 1 row of "source" matches the same row in "target". + * This should not affect the runtime of the query as it's running in parallel with other + * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the + * cardinality_violation() UDF throws an error whenever it's called killin the query + */ + private void handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString) throws SemanticException { + if(!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) { + LOG.info("Merge statement cardinality violation check is disabled: " + + HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname); + return; + } + //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a + // given session, i.e. the name can be fixed across all invocations + String tableName = "merge_tmp_table"; + rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + .append("\n SELECT cardinality_violation()\n WHERE ").append(onClauseAsString) + .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID") + .append(" HAVING count(*) > 1"); + try { + if (null == db.getTable(tableName, false)) { + StorageFormat format = new StorageFormat(conf); + format.processStorageFormat("TextFile"); + Table table = db.newTable(tableName); + table.setSerializationLib(format.getSerde()); + List fields = new ArrayList(); + fields.add(new FieldSchema("val", "int", null)); + table.setFields(fields); + table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(), + tableName), conf)); + table.getTTable().setTemporary(true); + table.setStoredAsSubDirectories(false); + table.setInputFormatClass(format.getInputFormat()); + table.setOutputFormatClass(format.getOutputFormat()); + db.createTable(table, true); + } + } + catch(HiveException|MetaException e) { + throw new SemanticException(e.getMessage(), e); + } + } /** * @param onClauseAsString - because there is no clone() and we need to use in multiple places * @param deleteExtraPredicate - see notes at caller diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 9e2179c..77ce378 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -106,6 +106,7 @@ public void setUp() throws Exception { hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(); File f = new File(TEST_WAREHOUSE_DIR); @@ -801,6 +802,24 @@ public void testMergeDeleteUpdate() throws Exception { int[][] rExpected = {{5,6},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } + + /** + * see https://issues.apache.org/jira/browse/HIVE-14949 for details + * @throws Exception + */ + @Test + public void testMergeCardinalityViolation() throws Exception { + int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + String query = "merge into " + Table.ACIDTBL + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED and s.a < 5 THEN DELETE " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + runStatementOnDriverNegative(query); + } @Test public void testSetClauseFakeColumn() throws Exception { CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +