diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1dbae40..06fa9c5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1943,6 +1943,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), + + MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true, + "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" + + "table there is at most 1 matching row in the source table."), // For Druid storage handler HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY", diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 4fce1ac..1fea864 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -464,6 +464,7 @@ system.registerGenericUDF("printf", GenericUDFPrintf.class); system.registerGenericUDF("greatest", GenericUDFGreatest.class); system.registerGenericUDF("least", GenericUDFLeast.class); + system.registerGenericUDF("cardinality_violation", GenericUDFCardinalityViolation.class); system.registerGenericUDF("from_utc_timestamp", GenericUDFFromUtcTimestamp.class); system.registerGenericUDF("to_utc_timestamp", GenericUDFToUtcTimestamp.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e5d0101..228eb07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -856,7 +856,7 @@ private static void writeAsText(String text, FSDataOutputStream out) throws IOEx } /** - * Generate a temp table out of a value clause + * Generate a temp table out of a values clause * See also {@link #preProcessForInsert(ASTNode, QB)} */ private ASTNode genValuesTempTable(ASTNode originalFrom, QB qb) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 79355ba..f102786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -29,9 +29,12 @@ import java.util.Set; import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -127,16 +130,19 @@ private void addPartitionColsToInsert(List partCols, StringBuilder /** * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2 * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2... - * @param targetName simple target table name (i.e. name or alias) + * @param target target table */ - private void addPartitionColsToSelect(List partCols, StringBuilder rewrittenQueryStr, String targetName) { + private void addPartitionColsToSelect(List partCols, StringBuilder rewrittenQueryStr, + ASTNode target) throws SemanticException { + String targetName = target != null ? getSimpleTableName(target) : null; + // If the table is partitioned, we need to select the partition columns as well. if (partCols != null) { for (FieldSchema fschema : partCols) { rewrittenQueryStr.append(", "); //would be nice if there was a way to determine if quotes are needed if(targetName != null) { - rewrittenQueryStr.append(HiveUtils.unparseIdentifier(targetName, this.conf)).append('.'); + rewrittenQueryStr.append(targetName).append('.'); } rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf)); } @@ -690,13 +696,15 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); } - + handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText, targetTable); ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); Context rewrittenCtx = rr.rewrittenCtx; ASTNode rewrittenTree = rr.rewrittenTree; //set dest name mapping on new context - for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) { + for(int insClauseIdx = 1, whenClauseIdx = 0; + insClauseIdx < rewrittenTree.getChildCount() - 1/*skip cardinality violation clause*/; + insClauseIdx++, whenClauseIdx++) { //we've added Insert clauses in order or WHEN items in whenClauses ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx); switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) { @@ -810,6 +818,61 @@ private boolean isTargetTable(Entity entity, Table targetTable) { */ return targetTable.equals(entity.getTable()); } + + /** + * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, + * an error should be raised if > 1 row of "source" matches the same row in "target". + * This should not affect the runtime of the query as it's running in parallel with other + * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the + * cardinality_violation() UDF throws an error whenever it's called killing the query + */ + private void handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString, Table targetTable) + throws SemanticException { + if(!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) { + LOG.info("Merge statement cardinality violation check is disabled: " + + HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname); + return; + } + //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a + // given session, i.e. the name can be fixed across all invocations + String tableName = "merge_tmp_table"; + rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + .append("\n SELECT cardinality_violation(") + .append(getSimpleTableName(target)).append(".ROW__ID"); + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); + + rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString) + .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID"); + + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); + + rewrittenQueryStr.append(" HAVING count(*) > 1"); + //say table T has partiton p, we are generating + //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p + //the Group By args are passed to cardinality_violation to add the violating value to the error msg + try { + if (null == db.getTable(tableName, false)) { + StorageFormat format = new StorageFormat(conf); + format.processStorageFormat("TextFile"); + Table table = db.newTable(tableName); + table.setSerializationLib(format.getSerde()); + List fields = new ArrayList(); + fields.add(new FieldSchema("val", "int", null)); + table.setFields(fields); + table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(), + tableName), conf)); + table.getTTable().setTemporary(true); + table.setStoredAsSubDirectories(false); + table.setInputFormatClass(format.getInputFormat()); + table.setOutputFormatClass(format.getOutputFormat()); + db.createTable(table, true); + } + } + catch(HiveException|MetaException e) { + throw new SemanticException(e.getMessage(), e); + } + } /** * @param onClauseAsString - because there is no clone() and we need to use in multiple places * @param deleteExtraPredicate - see notes at caller @@ -849,7 +912,7 @@ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewri rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf)); } } - addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, targetName); + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause); if(extraPredicate != null) { @@ -883,7 +946,7 @@ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewri addPartitionColsToInsert(partCols, rewrittenQueryStr); rewrittenQueryStr.append(" -- delete clause\n select ").append(targetName).append(".ROW__ID "); - addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName); + addPartitionColsToSelect(partCols, rewrittenQueryStr, target); rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause); if(extraPredicate != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFCardinalityViolation.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFCardinalityViolation.java new file mode 100644 index 0000000..0724ff4e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFCardinalityViolation.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.apache.logging.log4j.core.layout.StringBuilderEncoder; + +/** + * GenericUDFArray. + * + */ +@Description(name = "cardinality_violation", + value = "_FUNC_(n0, n1...) - raises Cardinality Violation") +public class GenericUDFCardinalityViolation extends GenericUDF { + private transient Converter[] converters; + private transient ArrayList ret = new ArrayList(); + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + StringBuilder nonUniqueKey = new StringBuilder(); + for(DeferredObject t : arguments) { + if(nonUniqueKey.length() > 0) {nonUniqueKey.append(','); } + nonUniqueKey.append(t.get()); + } + throw new RuntimeException("Cardinality Violation in Merge statement: " + nonUniqueKey); + } + + @Override + public String getDisplayString(String[] children) { + return getStandardDisplayString("cardinality_violation", children, ","); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 9e2179c..c3c263b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -80,6 +80,7 @@ private Driver d; private static enum Table { ACIDTBL("acidTbl"), + ACIDTBLPART("acidTblPart"), ACIDTBL2("acidTbl2"), NONACIDORCTBL("nonAcidOrcTbl"), NONACIDORCTBL2("nonAcidOrcTbl2"); @@ -106,6 +107,7 @@ public void setUp() throws Exception { hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(); File f = new File(TEST_WAREHOUSE_DIR); @@ -120,6 +122,7 @@ public void setUp() throws Exception { d.setMaxRows(10000); dropTables(); runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -759,10 +762,11 @@ public void testMergeOnTezEdges() throws Exception { LOG.info("Explain1: " + sb); for(int i = 0; i < explain.size(); i++) { if(explain.get(i).contains("Edges:")) { - Assert.assertTrue(explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)")); + Assert.assertTrue(explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)")); Assert.assertTrue(explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)")); Assert.assertTrue(explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)")); - Assert.assertTrue(explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); + Assert.assertTrue(explain.get(i + 3).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)")); + Assert.assertTrue(explain.get(i + 4).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)")); break; } } @@ -801,6 +805,31 @@ public void testMergeDeleteUpdate() throws Exception { int[][] rExpected = {{5,6},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } + + /** + * see https://issues.apache.org/jira/browse/HIVE-14949 for details + * @throws Exception + */ + @Test + public void testMergeCardinalityViolation() throws Exception { + int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals)); + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + String query = "merge into " + Table.ACIDTBL + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED and s.a < 5 THEN DELETE " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + runStatementOnDriverNegative(query); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')"); + query = "merge into " + Table.ACIDTBLPART + + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + + "WHEN MATCHED and s.a < 5 THEN DELETE " + + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b, 'p1') "; + runStatementOnDriverNegative(query); + } @Test public void testSetClauseFakeColumn() throws Exception { CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL + diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5932d7e..e0665bb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -120,6 +120,8 @@ protected void setUpWithTableProperties(String tableProperties) throws Exception hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(); File f = new File(TEST_WAREHOUSE_DIR);