diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ceb322..03cdaba 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1949,6 +1949,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), + + MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, + "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + + "table there is at most 1 matching row in the source table."), // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", diff --git pom.xml pom.xml index 0a55c7c..80d481b 100644 --- pom.xml +++ pom.xml @@ -101,7 +101,7 @@ 2.4 2.4 2.4.3 - 2.19.1 + 2.18.1 2.4 2.8 2.9 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 6f01da0..c83df7d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.udf.UDFAtan; import org.apache.hadoop.hive.ql.udf.UDFBase64; import org.apache.hadoop.hive.ql.udf.UDFBin; +import org.apache.hadoop.hive.ql.udf.UDFCardinalityViolation; import org.apache.hadoop.hive.ql.udf.UDFChr; import org.apache.hadoop.hive.ql.udf.UDFConv; import org.apache.hadoop.hive.ql.udf.UDFCos; @@ -463,6 +464,7 @@ system.registerGenericUDF("printf", GenericUDFPrintf.class); system.registerGenericUDF("greatest", GenericUDFGreatest.class); system.registerGenericUDF("least", GenericUDFLeast.class); + system.registerUDF("cardinality_violation", UDFCardinalityViolation.class, false); system.registerGenericUDF("from_utc_timestamp", GenericUDFFromUtcTimestamp.class); system.registerGenericUDF("to_utc_timestamp", GenericUDFToUtcTimestamp.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f275f6a..474792f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -853,7 +853,7 @@ private static void writeAsText(String text, FSDataOutputStream out) throws IOEx } /** - * Generate a temp table out of a value clause + * Generate a temp table out of a values clause * See also {@link #preProcessForInsert(ASTNode, QB)} */ private ASTNode genValuesTempTable(ASTNode originalFrom, QB qb) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 79355ba..d4d8652 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -29,9 +29,12 @@ import java.util.Set; import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -690,13 +693,15 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); } - + handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText); ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); Context rewrittenCtx = rr.rewrittenCtx; ASTNode rewrittenTree = rr.rewrittenTree; //set dest name mapping on new context - for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) { + for(int insClauseIdx = 1, whenClauseIdx = 0; + insClauseIdx < rewrittenTree.getChildCount() - 1/*skip cardinality violation clause*/; + insClauseIdx++, whenClauseIdx++) { //we've added Insert clauses in order or WHEN items in whenClauses ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx); switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) { @@ -810,6 +815,50 @@ private boolean isTargetTable(Entity entity, Table targetTable) { */ return targetTable.equals(entity.getTable()); } + + /** + * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, + * an error should be raised if > 1 row of "source" matches the same row in "target". + * This should not affect the runtime of the query as it's running in parallel with other + * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the + * cardinality_violation() UDF throws an error whenever it's called killin the query + */ + private void handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString) throws SemanticException { + if(!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) { + LOG.info("Merge statement cardinality violation check is disabled: " + + HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname); + return; + } + //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a + // given session, i.e. the name can be fixed across all invocations + String tableName = "merge_tmp_table"; + rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + .append("\n SELECT cardinality_violation()\n WHERE ").append(onClauseAsString) + .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID") + .append(" HAVING count(*) > 1"); + try { + if (null == db.getTable(tableName, false)) { + StorageFormat format = new StorageFormat(conf); + format.processStorageFormat("TextFile"); + Table table = db.newTable(tableName); + table.setSerializationLib(format.getSerde()); + List fields = new ArrayList(); + fields.add(new FieldSchema("val", "int", null)); + table.setFields(fields); + table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(), + tableName), conf)); + table.getTTable().setTemporary(true); + table.setStoredAsSubDirectories(false); + table.setInputFormatClass(format.getInputFormat()); + table.setOutputFormatClass(format.getOutputFormat()); + db.createTable(table, true); + } + } + catch(HiveException|MetaException e) { + throw new SemanticException(e.getMessage(), e); + } + } /** * @param onClauseAsString - because there is no clone() and we need to use in multiple places * @param deleteExtraPredicate - see notes at caller diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/UDFCardinalityViolation.java ql/src/java/org/apache/hadoop/hive/ql/udf/UDFCardinalityViolation.java new file mode 100644 index 0000000..e40402f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDFCardinalityViolation.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.io.IntWritable; + +import java.util.Random; + +@Description(name = "cardinality_violation", + value = "_FUNC_() - raises Cardinality Violation") +@UDFType(deterministic = false) +//@VectorizedExpressions({FuncRandNoSeed.class, FuncRand.class}) +public class UDFCardinalityViolation extends UDF { + public UDFCardinalityViolation() { + } + + public IntWritable evaluate() { + throw new RuntimeException("Cardinality Violation in Merge statement"); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 9bfcc82..ef74449 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -106,6 +106,7 @@ public void setUp() throws Exception { hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(); File f = new File(TEST_WAREHOUSE_DIR); @@ -790,6 +791,24 @@ public void testMergeDeleteUpdate() throws Exception { int[][] rExpected = {{5,6},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } + + /** + * see https://issues.apache.org/jira/browse/HIVE-14949 for details + * @throws Exception + */ + @Test + public void testMergeCardinalityViolation() throws Exception { + int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + String query = "merge into " + Table.ACIDTBL + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED and s.a < 5 THEN DELETE " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + runStatementOnDriverNegative(query); + } @Test public void testSetClauseFakeColumn() throws Exception { CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +