diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java new file mode 100644 index 0000000..41e3754 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AcidExportSemanticAnalyzer.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles + * acid export statements. It works by rewriting the acid export into insert statements into a temporary table, + * and then export it from there. + */ +public class AcidExportSemanticAnalyzer extends RewriteSemanticAnalyzer { + AcidExportSemanticAnalyzer(QueryState queryState) throws SemanticException { + super(queryState); + } + + protected void analyze(ASTNode tree) throws SemanticException { + if (tree.getToken().getType() != HiveParser.TOK_EXPORT) { + throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + + "AcidExportSemanticAnalyzer"); + } + analyzeAcidExport(tree); + } + + /** + * Exporting an Acid table is more complicated than a flat table. It may contains delete events, + * which can only be interpreted properly withing the context of the table/metastore where they + * were generated. It may also contain insert events that belong to transactions that aborted + * where the same constraints apply. + * In order to make the export artifact free of these constraints, the export does a + * insert into tmpTable select * from to filter/apply the events in current + * context and then export the tmpTable. This export artifact can now be imported into any + * table on any cluster (subject to schema checks etc). + * See {@link #analyzeAcidExport(ASTNode)} + * @param tree Export statement + * @return true if exporting an Acid table. + */ + public static boolean isAcidExport(ASTNode tree) throws SemanticException { + assert tree != null && tree.getToken() != null && tree.getToken().getType() == HiveParser.TOK_EXPORT; + Tree tokTab = tree.getChild(0); + assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; + Table tableHandle = null; + try { + tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false); + } catch(HiveException ex) { + throw new SemanticException(ex); + } + + //tableHandle can be null if table doesn't exist + return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle); + } + private static String getTmptTableNameForExport(Table exportTable) { + String tmpTableDb = exportTable.getDbName(); + String tmpTableName = exportTable.getTableName() + "_" + UUID.randomUUID().toString().replace('-', '_'); + return Warehouse.getQualifiedName(tmpTableDb, tmpTableName); + } + + /** + * See {@link #isAcidExport(ASTNode)} + * 1. create the temp table T + * 2. compile 'insert into T select * from acidTable' + * 3. compile 'export acidTable' (acidTable will be replaced with T during execution) + * 4. create task to drop T + * + * Using a true temp (session level) table means it should not affect replication and the table + * is not visible outside the Session that created for security + */ + private void analyzeAcidExport(ASTNode ast) throws SemanticException { + assert ast != null && ast.getToken() != null && ast.getToken().getType() == HiveParser.TOK_EXPORT; + ASTNode tableTree = (ASTNode)ast.getChild(0); + assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; + ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0); + Table exportTable = getTargetTable(tokRefOrNameExportTable); + assert AcidUtils.isFullAcidTable(exportTable); + + //need to create the table "manually" rather than creating a task since it has to exist to + // compile the insert into T... + String newTableName = getTmptTableNameForExport(exportTable); //this is db.table + Map tblProps = new HashMap<>(); + tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + String location; + + // for temporary tables we set the location to something in the session's scratch dir + // it has the same life cycle as the tmp table + try { + // Generate a unique ID for temp table path. + // This path will be fixed for the life of the temp table. + Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString()); + path = Warehouse.getDnsPath(path, conf); + location = path.toString(); + } catch (MetaException err) { + throw new SemanticException("Error while generating temp table path:", err); + } + + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, + false, true, null, + null, location, null, null, + tblProps, + true, //important so we get an exception on name collision + Warehouse.getQualifiedName(exportTable.getTTable()), false); + Table newTable; + try { + ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName())); + inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security... + DDLTask createTableTask = (DDLTask) TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf); + createTableTask.setConf(conf); //above get() doesn't set it + createTableTask.execute(new DriverContext(new Context(conf))); + newTable = db.getTable(newTableName); + } catch(IOException|HiveException ex) { + throw new SemanticException(ex); + } + + //now generate insert statement + //insert into newTableName select * from ts + StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), tokRefOrNameExportTable, tableTree, + newTableName); + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery() + ASTNode rewrittenTree = rr.rewrittenTree; + try { + useSuper = true; + //newTable has to exist at this point to compile + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + //now we have the rootTasks set up for Insert ... Select + removeStatsTasks(rootTasks); + //now make an ExportTask from temp table + /*analyzeExport() creates TableSpec which in turn tries to build + "public List partitions" by looking in the metastore to find Partitions matching + the partition spec in the Export command. These of course don't exist yet since we've not + ran the insert stmt yet!!!!!!! + */ + Task exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, db, conf, inputs, outputs); + + // Add an alter table task to set transactional props + // do it after populating temp table so that it's written as non-transactional table but + // update props before export so that export archive metadata has these props. This way when + // IMPORT is done for this archive and target table doesn't exist, it will be created as Acid. + AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); + Map mapProps = new HashMap<>(); + mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + alterTblDesc.setProps(mapProps); + alterTblDesc.setOldName(newTableName); + addExportTask(rootTasks, exportTask, TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc))); + + // Now make a task to drop temp table + // {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) + ReplicationSpec replicationSpec = new ReplicationSpec(); + DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, false, true, replicationSpec); + Task dropTask = TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); + exportTask.addDependentTask(dropTask); + markReadEntityForUpdate(); + if (ctx.isExplainPlan()) { + try { + //so that "explain" doesn't "leak" tmp tables + // TODO: catalog + db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true); + } catch(HiveException ex) { + LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex); + } + } + } + + /** + * Generate + * insert into newTableName select * from ts + * for EXPORT command. + */ + private StringBuilder generateExportQuery(List partCols, ASTNode tokRefOrNameExportTable, + ASTNode tableTree, String newTableName) throws SemanticException { + StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable)); + //builds partition spec so we can build suitable WHERE clause + TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true); + if (exportTableSpec.getPartSpec() != null) { + StringBuilder whereClause = null; + int partColsIdx = -1; //keep track of corresponding col in partCols + for (Map.Entry ent : exportTableSpec.getPartSpec().entrySet()) { + partColsIdx++; + if (ent.getValue() == null) { + continue; //partial spec + } + if (whereClause == null) { + whereClause = new StringBuilder(" WHERE "); + } + if (whereClause.length() > " WHERE ".length()) { + whereClause.append(" AND "); + } + whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf)) + .append(" = ").append(genPartValueString(partCols.get(partColsIdx).getType(), ent.getValue())); + } + if (whereClause != null) { + rewrittenQueryStr.append(whereClause); + } + } + return rewrittenQueryStr; + } + + /** + * Makes the exportTask run after all other tasks of the "insert into T ..." are done. + */ + private void addExportTask(List> rootTasks, + Task exportTask, Task alterTable) { + for (Task t : rootTasks) { + if (t.getNumChild() <= 0) { + //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978 + t.addDependentTask(alterTable); + //this is a leaf so add exportTask to follow it + alterTable.addDependentTask(exportTask); + } else { + addExportTask(t.getDependentTasks(), exportTask, alterTable); + } + } + } + + private void removeStatsTasks(List> rootTasks) { + List> statsTasks = findStatsTasks(rootTasks, null); + if (statsTasks == null) { + return; + } + for (Task statsTask : statsTasks) { + if (statsTask.getParentTasks() == null) { + continue; //should never happen + } + for (Task t : new ArrayList<>(statsTask.getParentTasks())) { + t.removeDependentTask(statsTask); + } + } + } + + private List> findStatsTasks( + List> rootTasks, List> statsTasks) { + for (Task t : rootTasks) { + if (t instanceof StatsTask) { + if (statsTasks == null) { + statsTasks = new ArrayList<>(); + } + statsTasks.add(t); + } + if (t.getDependentTasks() != null) { + statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks); + } + } + return statsTasks; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java new file mode 100644 index 0000000..44f7b43 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -0,0 +1,760 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.antlr.runtime.TokenRewriteStream; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; + + +/** + * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles + * merge statements. It works by rewriting the updates and deletes into insert statements (since + * they are actually inserts) and then doing some patch up to make them work as merges instead. + */ +public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer { + MergeSemanticAnalyzer(QueryState queryState) throws SemanticException { + super(queryState); + } + + @Override + public void analyze(ASTNode tree) throws SemanticException { + if (tree.getToken().getType() != HiveParser.TOK_MERGE) { + throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + + "MergeSemanticAnalyzer"); + } + analyzeMerge(tree); + } + + private static final String INDENT = " "; + + private IdentifierQuoter quotedIdenfierHelper; + + /** + * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it. + * Since HiveLexer.g is written such that it strips away any ` (back ticks) around + * quoted identifiers we need to add those back to generated SQL. + * Additionally, the parser only produces tokens of type Identifier and never + * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers. + * (') around String literals are retained w/o issues + */ + private static class IdentifierQuoter { + private final TokenRewriteStream trs; + private final IdentityHashMap visitedNodes = new IdentityHashMap<>(); + + IdentifierQuoter(TokenRewriteStream trs) { + this.trs = trs; + if (trs == null) { + throw new IllegalArgumentException("Must have a TokenRewriteStream"); + } + } + + private void visit(ASTNode n) { + if (n.getType() == HiveParser.Identifier) { + if (visitedNodes.containsKey(n)) { + /** + * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take + * care to only quote Identifiers in each subtree once, but this makes it safe + */ + return; + } + visitedNodes.put(n, n); + trs.insertBefore(n.getToken(), "`"); + trs.insertAfter(n.getToken(), "`"); + } + if (n.getChildCount() <= 0) { + return; + } + for (Node c : n.getChildren()) { + visit((ASTNode)c); + } + } + } + + /** + * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without + * needing to understand what it is (except for QuotedIdentifiers). + */ + private String getMatchedText(ASTNode n) { + quotedIdenfierHelper.visit(n); + return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(), + n.getTokenStopIndex() + 1).trim(); + } + + /** + * Here we take a Merge statement AST and generate a semantically equivalent multi-insert + * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, + * the new SQL statement is made to look like the input SQL statement so that it's easier to map + * Query Compiler errors from generated SQL to original one this way. + * The generated SQL is a complete representation of the original input for the same reason. + * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates. + * If generated SQL doesn't have everything and is patched up later, these coordinates point to + * the wrong place. + * + * @throws SemanticException + */ + private void analyzeMerge(ASTNode tree) throws SemanticException { + quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); + /* + * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST + For example, given: + MERGE INTO acidTbl USING nonAcidPart2 source ON acidTbl.a = source.a2 + WHEN MATCHED THEN UPDATE SET b = source.b2 + WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) + + We get AST like this: + "(tok_merge " + + "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " + + "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " + + "(tok_matched " + + "(tok_update " + + "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " + + "(tok_not_matched " + + "tok_insert " + + "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))"); + + And need to produce a multi-insert like this to execute: + FROM acidTbl RIGHT OUTER JOIN nonAcidPart2 ON acidTbl.a = source.a2 + INSERT INTO TABLE acidTbl SELECT nonAcidPart2.a2, nonAcidPart2.b2 WHERE acidTbl.a IS null + INSERT INTO TABLE acidTbl SELECT target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 + WHERE nonAcidPart2.a2=acidTbl.a SORT BY acidTbl.ROW__ID + */ + /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if + original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but + the errors will point at locations that the user can't map to anything + - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last + in Select clause of Insert as Select + todo: do we care to preserve comments in original SQL? + todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent + Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse... + todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when + source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0 + rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error + */ + ASTNode target = (ASTNode)tree.getChild(0); + ASTNode source = (ASTNode)tree.getChild(1); + String targetName = getSimpleTableName(target); + String sourceName = getSimpleTableName(source); + ASTNode onClause = (ASTNode) tree.getChild(2); + String onClauseAsText = getMatchedText(onClause); + + int whenClauseBegins = 3; + boolean hasHint = false; + // query hint + ASTNode qHint = (ASTNode) tree.getChild(3); + if (qHint.getType() == HiveParser.QUERY_HINT) { + hasHint = true; + whenClauseBegins++; + } + Table targetTable = getTargetTable(target); + validateTargetTable(targetTable); + List whenClauses = findWhenClauses(tree, whenClauseBegins); + + StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n"); + + rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(target)); + if (isAliased(target)) { + rewrittenQueryStr.append(" ").append(targetName); + } + rewrittenQueryStr.append('\n'); + rewrittenQueryStr.append(INDENT).append(chooseJoinType(whenClauses)).append("\n"); + if (source.getType() == HiveParser.TOK_SUBQUERY) { + //this includes the mandatory alias + rewrittenQueryStr.append(INDENT).append(getMatchedText(source)); + } else { + rewrittenQueryStr.append(INDENT).append(getFullTableNameForSQL(source)); + if (isAliased(source)) { + rewrittenQueryStr.append(" ").append(sourceName); + } + } + rewrittenQueryStr.append('\n'); + rewrittenQueryStr.append(INDENT).append("ON ").append(onClauseAsText).append('\n'); + + // Add the hint if any + String hintStr = null; + if (hasHint) { + hintStr = " /*+ " + qHint.getText() + " */ "; + } + + /** + * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete + * If we have both update and delete, the 1st one (in SQL code) must have "AND " + * so that the 2nd can ensure not to process the same rows. + * Update and Delete may be in any order. (Insert is always last) + */ + String extraPredicate = null; + int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0; + int numInsertClauses = 0; + boolean hintProcessed = false; + for (ASTNode whenClause : whenClauses) { + switch (getWhenClauseOperation(whenClause).getType()) { + case HiveParser.TOK_INSERT: + numInsertClauses++; + handleInsert(whenClause, rewrittenQueryStr, target, onClause, + targetTable, targetName, onClauseAsText, hintProcessed ? null : hintStr); + hintProcessed = true; + break; + case HiveParser.TOK_UPDATE: + numWhenMatchedUpdateClauses++; + String s = handleUpdate(whenClause, rewrittenQueryStr, target, + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + hintProcessed = true; + if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { + extraPredicate = s; //i.e. it's the 1st WHEN MATCHED + } + break; + case HiveParser.TOK_DELETE: + numWhenMatchedDeleteClauses++; + String s1 = handleDelete(whenClause, rewrittenQueryStr, target, + onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); + hintProcessed = true; + if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { + extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED + } + break; + default: + throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() + + addParseInfo(whenClause)); + } + if (numWhenMatchedDeleteClauses > 1) { + throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd()); + } + if (numWhenMatchedUpdateClauses > 1) { + throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd()); + } + assert numInsertClauses < 2: "too many Insert clauses"; + } + if (numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { + throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); + } + + boolean validating = handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText, targetTable, + numWhenMatchedDeleteClauses == 0 && numWhenMatchedUpdateClauses == 0); + ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); + Context rewrittenCtx = rr.rewrittenCtx; + ASTNode rewrittenTree = rr.rewrittenTree; + rewrittenCtx.setOperation(Context.Operation.MERGE); + + //set dest name mapping on new context; 1st chid is TOK_FROM + for (int insClauseIdx = 1, whenClauseIdx = 0; + insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/); + insClauseIdx++, whenClauseIdx++) { + //we've added Insert clauses in order or WHEN items in whenClauses + switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) { + case HiveParser.TOK_INSERT: + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); + break; + case HiveParser.TOK_UPDATE: + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); + break; + case HiveParser.TOK_DELETE: + rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE); + break; + default: + assert false; + } + } + if (validating) { + //here means the last branch of the multi-insert is Cardinality Validation + rewrittenCtx.addDestNamePrefix(rewrittenTree.getChildCount() - 1, Context.DestClausePrefix.INSERT); + } + + try { + useSuper = true; + super.analyze(rewrittenTree, rewrittenCtx); + } finally { + useSuper = false; + } + updateOutputs(targetTable); + } + + /** + * If there is no WHEN NOT MATCHED THEN INSERT, we don't outer join. + */ + private String chooseJoinType(List whenClauses) { + for (ASTNode whenClause : whenClauses) { + if (getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) { + return "RIGHT OUTER JOIN"; + } + } + return "INNER JOIN"; + } + + /** + * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, + * an error should be raised if > 1 row of "source" matches the same row in "target". + * This should not affect the runtime of the query as it's running in parallel with other + * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the + * cardinality_violation() UDF throws an error whenever it's called killing the query + * @return true if another Insert clause was added + */ + private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString, Table targetTable, boolean onlyHaveWhenNotMatchedClause) + throws SemanticException { + if (!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) { + LOG.info("Merge statement cardinality violation check is disabled: " + + HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname); + return false; + } + if (onlyHaveWhenNotMatchedClause) { + //if no update or delete in Merge, there is no need to to do cardinality check + return false; + } + //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a + // given session, i.e. the name can be fixed across all invocations + String tableName = "merge_tmp_table"; + rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) + .append("\n SELECT cardinality_violation(") + .append(getSimpleTableName(target)).append(".ROW__ID"); + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); + + rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString) + .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID"); + + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); + + rewrittenQueryStr.append(" HAVING count(*) > 1"); + //say table T has partition p, we are generating + //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p + //the Group By args are passed to cardinality_violation to add the violating value to the error msg + try { + if (null == db.getTable(tableName, false)) { + StorageFormat format = new StorageFormat(conf); + format.processStorageFormat("TextFile"); + Table table = db.newTable(tableName); + table.setSerializationLib(format.getSerde()); + List fields = new ArrayList(); + fields.add(new FieldSchema("val", "int", null)); + table.setFields(fields); + table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(), + tableName), conf)); + table.getTTable().setTemporary(true); + table.setStoredAsSubDirectories(false); + table.setInputFormatClass(format.getInputFormat()); + table.setOutputFormatClass(format.getOutputFormat()); + db.createTable(table, true); + } + } catch(HiveException|MetaException e) { + throw new SemanticException(e.getMessage(), e); + } + return true; + } + + /** + * @param onClauseAsString - because there is no clone() and we need to use in multiple places + * @param deleteExtraPredicate - see notes at caller + */ + private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr) + throws SemanticException { + assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED; + assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE; + String targetName = getSimpleTableName(target); + rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); + addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); + rewrittenQueryStr.append(" -- update clause\n SELECT "); + if (hintStr != null) { + rewrittenQueryStr.append(hintStr); + } + rewrittenQueryStr.append(targetName).append(".ROW__ID"); + + ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0); + //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions + //before reparsing, i.e. they are known to SemanticAnalyzer logic + Map setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable); + //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end + //up with + //insert into target (p1) select current_date(), 5, c3, p1 where .... + //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table + //names + List nonPartCols = targetTable.getCols(); + for (FieldSchema fs : nonPartCols) { + rewrittenQueryStr.append(", "); + String name = fs.getName(); + if (setColsExprs.containsKey(name)) { + String rhsExp = getMatchedText(setColsExprs.get(name)); + //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream + switch (rhsExp.charAt(rhsExp.length() - 1)) { + case ',': + case '\n': + rhsExp = rhsExp.substring(0, rhsExp.length() - 1); + break; + default: + //do nothing + } + rewrittenQueryStr.append(rhsExp); + } else { + rewrittenQueryStr.append(getSimpleTableName(target)) + .append(".") + .append(HiveUtils.unparseIdentifier(name, this.conf)); + } + } + addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); + rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); + String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause); + if (extraPredicate != null) { + //we have WHEN MATCHED AND THEN DELETE + rewrittenQueryStr.append(" AND ").append(extraPredicate); + } + if (deleteExtraPredicate != null) { + rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")"); + } + rewrittenQueryStr.append("\n SORT BY "); + rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); + + setUpAccessControlInfoForUpdate(targetTable, setColsExprs); + //we don't deal with columns on RHS of SET expression since the whole expr is part of the + //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to + //figure which cols on RHS are from source and which from target + + return extraPredicate; + } + + /** + * @param onClauseAsString - because there is no clone() and we need to use in multiple places + * @param updateExtraPredicate - see notes at caller + */ + private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target, + String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr) + throws SemanticException { + assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED; + assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; + List partCols = targetTable.getPartCols(); + String targetName = getSimpleTableName(target); + rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); + addPartitionColsToInsert(partCols, rewrittenQueryStr); + + rewrittenQueryStr.append(" -- delete clause\n SELECT "); + if (hintStr != null) { + rewrittenQueryStr.append(hintStr); + } + rewrittenQueryStr.append(targetName).append(".ROW__ID "); + addPartitionColsToSelect(partCols, rewrittenQueryStr, target); + rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); + String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause); + if (extraPredicate != null) { + //we have WHEN MATCHED AND THEN DELETE + rewrittenQueryStr.append(" AND ").append(extraPredicate); + } + if (updateExtraPredicate != null) { + rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")"); + } + rewrittenQueryStr.append("\n SORT BY "); + rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); + return extraPredicate; + } + + private static String addParseInfo(ASTNode n) { + return " at " + ErrorMsg.renderPosition(n); + } + + private boolean isAliased(ASTNode n) { + switch (n.getType()) { + case HiveParser.TOK_TABREF: + return findTabRefIdxs(n)[0] != 0; + case HiveParser.TOK_TABNAME: + return false; + case HiveParser.TOK_SUBQUERY: + assert n.getChildCount() > 1 : "Expected Derived Table to be aliased"; + return true; + default: + throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n); + } + } + + /** + * Collect WHEN clauses from Merge statement AST. + */ + private List findWhenClauses(ASTNode tree, int start) throws SemanticException { + assert tree.getType() == HiveParser.TOK_MERGE; + List whenClauses = new ArrayList<>(); + for (int idx = start; idx < tree.getChildCount(); idx++) { + ASTNode whenClause = (ASTNode)tree.getChild(idx); + assert whenClause.getType() == HiveParser.TOK_MATCHED || + whenClause.getType() == HiveParser.TOK_NOT_MATCHED : + "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause); + whenClauses.add(whenClause); + } + if (whenClauses.size() <= 0) { + //Futureproofing: the parser will actually not allow this + throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement"); + } + return whenClauses; + } + + private ASTNode getWhenClauseOperation(ASTNode whenClause) { + if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) { + throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause); + } + return (ASTNode) whenClause.getChild(0); + } + + /** + * Returns the as in WHEN MATCHED AND THEN... + * @return may be null + */ + private String getWhenClausePredicate(ASTNode whenClause) { + if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) { + throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause); + } + if (whenClause.getChildCount() == 2) { + return getMatchedText((ASTNode)whenClause.getChild(1)); + } + return null; + } + + /** + * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause. + * @param targetTableNameInSourceQuery - simple name/alias + * @throws SemanticException + */ + private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target, + ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, String onClauseAsString, + String hintStr) throws SemanticException { + ASTNode whenClauseOperation = getWhenClauseOperation(whenNotMatchedClause); + assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED; + assert whenClauseOperation.getType() == HiveParser.TOK_INSERT; + + // identify the node that contains the values to insert and the optional column list node + ArrayList children = whenClauseOperation.getChildren(); + ASTNode valuesNode = + (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_FUNCTION).findFirst().get(); + ASTNode columnListNode = + (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_TABCOLNAME).findFirst() + .orElse(null); + + // if column list is specified, then it has to have the same number of elements as the values + // valuesNode has a child for struct, the rest are the columns + if (columnListNode != null && columnListNode.getChildCount() != (valuesNode.getChildCount() - 1)) { + throw new SemanticException(String.format("Column schema must have the same length as values (%d vs %d)", + columnListNode.getChildCount(), valuesNode.getChildCount() - 1)); + } + + rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); + if (columnListNode != null) { + rewrittenQueryStr.append(' ').append(getMatchedText(columnListNode)); + } + addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); + + rewrittenQueryStr.append(" -- insert clause\n SELECT "); + if (hintStr != null) { + rewrittenQueryStr.append(hintStr); + } + + OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery, + conf, onClauseAsString); + oca.analyze(); + + String valuesClause = getMatchedText(valuesNode); + valuesClause = valuesClause.substring(1, valuesClause.length() - 1); //strip '(' and ')' + valuesClause = replaceDefaultKeywordForMerge(valuesClause, targetTable, columnListNode); + rewrittenQueryStr.append(valuesClause).append("\n WHERE ").append(oca.getPredicate()); + + String extraPredicate = getWhenClausePredicate(whenNotMatchedClause); + if (extraPredicate != null) { + //we have WHEN NOT MATCHED AND THEN INSERT + rewrittenQueryStr.append(" AND ") + .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n'); + } + } + + private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode) + throws SemanticException { + if (!valueClause.toLowerCase().contains("`default`")) { + return valueClause; + } + + Map colNameToDefaultConstraint = getColNameToDefaultValueMap(table); + String[] values = valueClause.trim().split(","); + String[] replacedValues = new String[values.length]; + + // the list of the column names may be set in the query + String[] columnNames = columnListNode == null ? + table.getAllCols().stream().map(f -> f.getName()).toArray(size -> new String[size]) : + columnListNode.getChildren().stream().map(n -> ((ASTNode)n).toString()).toArray(size -> new String[size]); + + for (int i = 0; i < values.length; i++) { + if (values[i].trim().toLowerCase().equals("`default`")) { + replacedValues[i] = MapUtils.getString(colNameToDefaultConstraint, columnNames[i], "null"); + } else { + replacedValues[i] = values[i]; + } + } + return StringUtils.join(replacedValues, ','); + } + + /** + * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from + * target table and 'd' is from source expression. In order to properly + * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where + * clause of this Insert contains "target.a is null and target.c is null" This ensures that this + * Insert leg does not receive any rows that are processed by Insert corresponding to + * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an + * unqualified column is part of the target table. We can get away with this simple logic because + * we know that target is always a table (as opposed to some derived table). + * The job of this class is to generate this predicate. + * + * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates + * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown, + * and so it will be False for WHEN NOT MATCHED Insert... + */ + private static final class OnClauseAnalyzer { + private final ASTNode onClause; + private final Map> table2column = new HashMap<>(); + private final List unresolvedColumns = new ArrayList<>(); + private final List allTargetTableColumns = new ArrayList<>(); + private final Set tableNamesFound = new HashSet<>(); + private final String targetTableNameInSourceQuery; + private final HiveConf conf; + private final String onClauseAsString; + + /** + * @param targetTableNameInSourceQuery alias or simple name + */ + OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, + HiveConf conf, String onClauseAsString) { + this.onClause = onClause; + allTargetTableColumns.addAll(targetTable.getCols()); + allTargetTableColumns.addAll(targetTable.getPartCols()); + this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery); + this.conf = conf; + this.onClauseAsString = onClauseAsString; + } + + /** + * Finds all columns and groups by table ref (if there is one). + */ + private void visit(ASTNode n) { + if (n.getType() == HiveParser.TOK_TABLE_OR_COL) { + ASTNode parent = (ASTNode) n.getParent(); + if (parent != null && parent.getType() == HiveParser.DOT) { + //the ref must be a table, so look for column name as right child of DOT + if (parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) { + //I don't think this can happen... but just in case + throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString); + } + addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText()); + } else { + //must be just a column name + unresolvedColumns.add(n.getChild(0).getText()); + } + } + if (n.getChildCount() == 0) { + return; + } + for (Node child : n.getChildren()) { + visit((ASTNode)child); + } + } + + private void analyze() { + visit(onClause); + if (tableNamesFound.size() > 2) { + throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " + + tableNamesFound + " in " + onClauseAsString); + } + handleUnresolvedColumns(); + if (tableNamesFound.size() > 2) { + throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " + + "Found " + tableNamesFound + " in " + onClauseAsString); + } + } + + /** + * Find those that belong to target table. + */ + private void handleUnresolvedColumns() { + if (unresolvedColumns.isEmpty()) { + return; + } + for (String c : unresolvedColumns) { + for (FieldSchema fs : allTargetTableColumns) { + if (c.equalsIgnoreCase(fs.getName())) { + //c belongs to target table; strictly speaking there maybe an ambiguous ref but + //this will be caught later when multi-insert is parsed + addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c); + break; + } + } + } + } + + private void addColumn2Table(String tableName, String columnName) { + tableName = tableName.toLowerCase(); //normalize name for mapping + tableNamesFound.add(tableName); + List cols = table2column.get(tableName); + if (cols == null) { + cols = new ArrayList<>(); + table2column.put(tableName, cols); + } + //we want to preserve 'columnName' as it was in original input query so that rewrite + //looks as much as possible like original query + cols.add(columnName); + } + + /** + * Now generate the predicate for Where clause. + */ + private String getPredicate() { + //normilize table name for mapping + List targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase()); + if (targetCols == null) { + /*e.g. ON source.t=1 + * this is not strictly speaking invalid but it does ensure that all columns from target + * table are all NULL for every row. This would make any WHEN MATCHED clause invalid since + * we don't have a ROW__ID. The WHEN NOT MATCHED could be meaningful but it's just data from + * source satisfying source.t=1... not worth the effort to support this*/ + throw new IllegalArgumentException(ErrorMsg.INVALID_TABLE_IN_ON_CLAUSE_OF_MERGE + .format(targetTableNameInSourceQuery, onClauseAsString)); + } + StringBuilder sb = new StringBuilder(); + for (String col : targetCols) { + if (sb.length() > 0) { + sb.append(" AND "); + } + //but preserve table name in SQL + sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf)) + .append(".") + .append(HiveUtils.unparseIdentifier(col, conf)) + .append(" IS NULL"); + } + return sb.toString(); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java new file mode 100644 index 0000000..6caac11 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles + * update, delete and merge statements. It works by rewriting the updates and deletes into insert + * statements (since they are actually inserts) and then doing some patch up to make them work as + * updates and deletes instead. + */ +public abstract class RewriteSemanticAnalyzer extends SemanticAnalyzer { + protected static final Logger LOG = LoggerFactory.getLogger(RewriteSemanticAnalyzer.class); + + protected boolean useSuper = false; + + RewriteSemanticAnalyzer(QueryState queryState) throws SemanticException { + super(queryState); + } + + @Override + public void analyzeInternal(ASTNode tree) throws SemanticException { + if (useSuper) { + super.analyzeInternal(tree); + } else { + if (!getTxnMgr().supportsAcid()) { + throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); + } + analyze(tree); + cleanUpMetaColumnAccessControl(); + } + } + + protected abstract void analyze(ASTNode tree) throws SemanticException; + + /** + * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2 + * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2... + * @param target target table + */ + protected void addPartitionColsToSelect(List partCols, StringBuilder rewrittenQueryStr, + ASTNode target) throws SemanticException { + String targetName = target != null ? getSimpleTableName(target) : null; + + // If the table is partitioned, we need to select the partition columns as well. + if (partCols != null) { + for (FieldSchema fschema : partCols) { + rewrittenQueryStr.append(", "); + //would be nice if there was a way to determine if quotes are needed + if (targetName != null) { + rewrittenQueryStr.append(targetName).append('.'); + } + rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf)); + } + } + } + + /** + * Assert that we are not asked to update a bucketing column or partition column. + * @param colName it's the A in "SET A = B" + */ + protected void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException { + String columnName = normalizeColName(colName.getText()); + + // Make sure this isn't one of the partitioning columns, that's not supported. + for (FieldSchema fschema : targetTable.getPartCols()) { + if (fschema.getName().equalsIgnoreCase(columnName)) { + throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg()); + } + } + //updating bucket column should move row from one file to another - not supported + if (targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) { + throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE, columnName); + } + boolean foundColumnInTargetTable = false; + for (FieldSchema col : targetTable.getCols()) { + if (columnName.equalsIgnoreCase(col.getName())) { + foundColumnInTargetTable = true; + break; + } + } + if (!foundColumnInTargetTable) { + throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(), + targetTable.getFullyQualifiedName()); + } + } + + protected ASTNode findLHSofAssignment(ASTNode assignment) { + assert assignment.getToken().getType() == HiveParser.EQUAL : + "Expected set assignments to use equals operator but found " + assignment.getName(); + ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0); + assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL : + "Expected left side of assignment to be table or column"; + ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0); + assert colName.getToken().getType() == HiveParser.Identifier : + "Expected column name"; + return colName; + } + + protected Map collectSetColumnsAndExpressions(ASTNode setClause, + Set setRCols, Table targetTable) throws SemanticException { + // An update needs to select all of the columns, as we rewrite the entire row. Also, + // we need to figure out which columns we are going to replace. + assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE : + "Expected second child of update token to be set token"; + + // Get the children of the set clause, each of which should be a column assignment + List assignments = setClause.getChildren(); + // Must be deterministic order map for consistent q-test output across Java versions + Map setCols = new LinkedHashMap(assignments.size()); + for (Node a : assignments) { + ASTNode assignment = (ASTNode)a; + ASTNode colName = findLHSofAssignment(assignment); + if (setRCols != null) { + addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols); + } + checkValidSetClauseTarget(colName, targetTable); + + String columnName = normalizeColName(colName.getText()); + // This means that in UPDATE T SET x = _something_ + // _something_ can be whatever is supported in SELECT _something_ + setCols.put(columnName, (ASTNode)assignment.getChildren().get(1)); + } + return setCols; + } + + /** + * @return the Metastore representation of the target table + */ + protected Table getTargetTable(ASTNode tabRef) throws SemanticException { + return getTable(tabRef, db, true); + } + + /** + * @param throwException if false, return null if table doesn't exist, else throw + */ + protected static Table getTable(ASTNode tabRef, Hive db, boolean throwException) throws SemanticException { + String[] tableName; + switch (tabRef.getType()) { + case HiveParser.TOK_TABREF: + tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0)); + break; + case HiveParser.TOK_TABNAME: + tableName = getQualifiedTableName(tabRef); + break; + default: + throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef); + } + + Table mTable; + try { + mTable = db.getTable(tableName[0], tableName[1], throwException); + } catch (InvalidTableException e) { + LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage()); + throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e); + } catch (HiveException e) { + LOG.error("Failed to find table " + getDotName(tableName) + " got exception " + e.getMessage()); + throw new SemanticException(e.getMessage(), e); + } + return mTable; + } + + /** + * Walk through all our inputs and set them to note that this read is part of an update or a delete. + */ + protected void markReadEntityForUpdate() { + for (ReadEntity input : inputs) { + if (isWritten(input)) { + //TODO: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock + //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity + //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries) + //so DbTxnManager skips Read lock on the ReadEntity.... + input.setUpdateOrDelete(true); //input.noLockNeeded()? + } + } + } + + /** + * For updates, we need to set the column access info so that it contains information on + * the columns we are updating. + * (But not all the columns of the target table even though the rewritten query writes + * all columns of target table since that is an implmentation detail). + */ + protected void setUpAccessControlInfoForUpdate(Table mTable, Map setCols) { + ColumnAccessInfo cai = new ColumnAccessInfo(); + for (String colName : setCols.keySet()) { + cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName); + } + setUpdateColumnAccessInfo(cai); + } + + /** + * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to + * require the user to have authorization on that column. + */ + private void cleanUpMetaColumnAccessControl() { + //we do this for Update/Delete (incl Merge) because we introduce this column into the query + //as part of rewrite + if (columnAccessInfo != null) { + columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID); + } + } + + /** + * Parse the newly generated SQL statement to get a new AST. + */ + protected ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) + throws SemanticException { + // Set dynamic partitioning to nonstrict so that queries do not need any partition + // references. + // TODO: this may be a perf issue as it prevents the optimizer.. or not + HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + // Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly. + HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false); + // Parse the rewritten query string + Context rewrittenCtx; + try { + rewrittenCtx = new Context(conf); + rewrittenCtx.setHDFSCleanup(true); + // We keep track of all the contexts that are created by this query + // so we can clear them when we finish execution + ctx.addRewrittenStatementContext(rewrittenCtx); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg()); + } + rewrittenCtx.setExplainConfig(ctx.getExplainConfig()); + rewrittenCtx.setExplainPlan(ctx.isExplainPlan()); + rewrittenCtx.setStatsSource(ctx.getStatsSource()); + rewrittenCtx.setPlanMapper(ctx.getPlanMapper()); + rewrittenCtx.setIsUpdateDeleteMerge(true); + rewrittenCtx.setCmd(rewrittenQueryStr.toString()); + + ASTNode rewrittenTree; + try { + LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">"); + rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx); + } catch (ParseException e) { + throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e); + } + return new ReparseResult(rewrittenTree, rewrittenCtx); + } + + /** + * Assert it supports Acid write. + */ + protected void validateTargetTable(Table mTable) throws SemanticException { + if (mTable.getTableType() == TableType.VIRTUAL_VIEW || mTable.getTableType() == TableType.MATERIALIZED_VIEW) { + LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view"); + throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg()); + } + } + + /** + * Check that {@code readEntity} is also being written. + */ + private boolean isWritten(Entity readEntity) { + for (Entity writeEntity : outputs) { + //make sure to compare them as Entity, i.e. that it's the same table or partition, etc + if (writeEntity.toString().equalsIgnoreCase(readEntity.toString())) { + return true; + } + } + return false; + } + + // This method finds any columns on the right side of a set statement (thus rcols) and puts them + // in a set so we can add them to the list of input cols to check. + private void addSetRCols(ASTNode node, Set setRCols) { + + // See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If + // not, recurse on any children + if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) { + ASTNode colName = (ASTNode)node.getChildren().get(0); + assert colName.getToken().getType() == HiveParser.Identifier : + "Expected column name"; + setRCols.add(normalizeColName(colName.getText())); + } else if (node.getChildren() != null) { + for (Node n : node.getChildren()) { + addSetRCols((ASTNode)n, setRCols); + } + } + } + + /** + * Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement. + * Unfortunately there is no single place that normalizes the input query. + * @param colName not null + */ + private static String normalizeColName(String colName) { + return colName.toLowerCase(); + } + + /** + * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check + * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager + * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we + * have change the table WriteEntity to a set of partition WriteEntity objects based on + * ReadEntity objects computed for this table. + */ + protected void updateOutputs(Table targetTable) { + markReadEntityForUpdate(); + + if (targetTable.isPartitioned()) { + List partitionsRead = getRestrictedPartitionSet(targetTable); + if (!partitionsRead.isEmpty()) { + // if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with + // WriteEntity for each partition + List toRemove = new ArrayList<>(); + for (WriteEntity we : outputs) { + WriteEntity.WriteType wt = we.getWriteType(); + if (isTargetTable(we, targetTable) && + (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) { + // The assumption here is that SemanticAnalyzer will will generate ReadEntity for each + // partition that exists and is matched by the WHERE clause (which may be all of them). + // Since we don't allow updating the value of a partition column, we know that we always + // write the same (or fewer) partitions than we read. Still, the write is a Dynamic + // Partition write - see HIVE-15032. + toRemove.add(we); + } + } + outputs.removeAll(toRemove); + // TODO: why is this like that? + for (ReadEntity re : partitionsRead) { + for (WriteEntity original : toRemove) { + //since we may have both Update and Delete branches, Auth needs to know + WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType()); + we.setDynamicPartitionWrite(original.isDynamicPartitionWrite()); + outputs.add(we); + } + } + } + } + } + + /** + * If the optimizer has determined that it only has to read some of the partitions of the + * target table to satisfy the query, then we know that the write side of update/delete + * (and update/delete parts of merge) + * can only write (at most) that set of partitions (since we currently don't allow updating + * partition (or bucket) columns). So we want to replace the table level + * WriteEntity in the outputs with WriteEntity for each of these partitions + * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any + * insert which does a select against the same table. Then SemanticAnalyzer would also + * be able to not use DP for the Insert... + * + * Note that the Insert of Merge may be creating new partitions and writing to partitions + * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created + * in MoveTask (or some other task after the query is complete). + */ + private List getRestrictedPartitionSet(Table targetTable) { + List partitionsRead = new ArrayList<>(); + for (ReadEntity re : inputs) { + if (re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) { + partitionsRead.add(re); + } + } + return partitionsRead; + } + + /** + * Does this Entity belong to target table (partition). + */ + private boolean isTargetTable(Entity entity, Table targetTable) { + //todo: https://issues.apache.org/jira/browse/HIVE-15048 + /** + * is this the right way to compare? Should it just compare paths? + * equals() impl looks heavy weight + */ + return targetTable.equals(entity.getTable()); + } + + /** + * Returns the table name to use in the generated query preserving original quotes/escapes if any. + * @see #getFullTableNameForSQL(ASTNode) + */ + protected String getSimpleTableName(ASTNode n) throws SemanticException { + return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf); + } + + protected String getSimpleTableNameBase(ASTNode n) throws SemanticException { + switch (n.getType()) { + case HiveParser.TOK_TABREF: + int aliasIndex = findTabRefIdxs(n)[0]; + if (aliasIndex != 0) { + return n.getChild(aliasIndex).getText(); //the alias + } + return getSimpleTableNameBase((ASTNode) n.getChild(0)); + case HiveParser.TOK_TABNAME: + if (n.getChildCount() == 2) { + //db.table -> return table + return n.getChild(1).getText(); + } + return n.getChild(0).getText(); + case HiveParser.TOK_SUBQUERY: + return n.getChild(1).getText(); //the alias + default: + throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n); + } + } + + protected static final class ReparseResult { + final ASTNode rewrittenTree; + final Context rewrittenCtx; + ReparseResult(ASTNode n, Context c) { + rewrittenTree = n; + rewrittenCtx = c; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 088b5cf..51a6b2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -216,8 +216,8 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { - return new UpdateDeleteSemanticAnalyzer(queryState); + if (AcidExportSemanticAnalyzer.isAcidExport(tree)) { + return new AcidExportSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); case HiveParser.TOK_IMPORT: @@ -368,9 +368,11 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_UPDATE_TABLE: case HiveParser.TOK_DELETE_FROM: - case HiveParser.TOK_MERGE: return new UpdateDeleteSemanticAnalyzer(queryState); + case HiveParser.TOK_MERGE: + return new MergeSemanticAnalyzer(queryState); + case HiveParser.TOK_START_TRANSACTION: case HiveParser.TOK_COMMIT: case HiveParser.TOK_ROLLBACK: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 8651afd..179021e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,603 +17,89 @@ */ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; -import org.antlr.runtime.TokenRewriteStream; -import org.antlr.runtime.tree.Tree; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.DDLTask; -import org.apache.hadoop.hive.ql.exec.StatsTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.hooks.Entity; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExportWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles - * update, delete and merge statements. It works by rewriting the updates and deletes into insert + * update and delete statements. It works by rewriting the updates and deletes into insert * statements (since they are actually inserts) and then doing some patch up to make them work as * updates and deletes instead. */ -public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { - private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class); +public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer { - private boolean useSuper = false; + private Context.Operation operation = Context.Operation.OTHER; UpdateDeleteSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } - @Override - public void analyzeInternal(ASTNode tree) throws SemanticException { - if (useSuper) { - super.analyzeInternal(tree); - } else { - if (!getTxnMgr().supportsAcid()) { - throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); - } - switch (tree.getToken().getType()) { - case HiveParser.TOK_DELETE_FROM: - analyzeDelete(tree); - break; - case HiveParser.TOK_UPDATE_TABLE: - analyzeUpdate(tree); - break; - case HiveParser.TOK_MERGE: - analyzeMerge(tree); - break; - case HiveParser.TOK_EXPORT: - analyzeAcidExport(tree); - break; - default: - throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + - "UpdateDeleteSemanticAnalyzer"); - } - cleanUpMetaColumnAccessControl(); - - } - } - private boolean updating() { - return currentOperation == Context.Operation.UPDATE; - } - private boolean deleting() { - return currentOperation == Context.Operation.DELETE; - } - - /** - * Exporting an Acid table is more complicated than a flat table. It may contains delete events, - * which can only be interpreted properly withing the context of the table/metastore where they - * were generated. It may also contain insert events that belong to transactions that aborted - * where the same constraints apply. - * In order to make the export artifact free of these constraints, the export does a - * insert into tmpTable select * from to filter/apply the events in current - * context and then export the tmpTable. This export artifact can now be imported into any - * table on any cluster (subject to schema checks etc). - * See {@link #analyzeAcidExport(ASTNode)} - * @param tree Export statement - * @return true if exporting an Acid table. - */ - public static boolean isAcidExport(ASTNode tree) throws SemanticException { - assert tree != null && tree.getToken() != null && - tree.getToken().getType() == HiveParser.TOK_EXPORT; - Tree tokTab = tree.getChild(0); - assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB; - Table tableHandle = null; - try { - tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false); - } catch(HiveException ex) { - throw new SemanticException(ex); - } - - //tableHandle can be null if table doesn't exist - return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle); - } - private static String getTmptTableNameForExport(Table exportTable) { - String tmpTableDb = exportTable.getDbName(); - String tmpTableName = exportTable.getTableName() + "_" + - UUID.randomUUID().toString().replace('-', '_'); - return Warehouse.getQualifiedName(tmpTableDb, tmpTableName); - } - - /** - * See {@link #isAcidExport(ASTNode)} - * 1. create the temp table T - * 2. compile 'insert into T select * from acidTable' - * 3. compile 'export acidTable' (acidTable will be replaced with T during execution) - * 4. create task to drop T - * - * Using a true temp (session level) table means it should not affect replication and the table - * is not visible outside the Session that created for security - */ - private void analyzeAcidExport(ASTNode ast) throws SemanticException { - assert ast != null && ast.getToken() != null && - ast.getToken().getType() == HiveParser.TOK_EXPORT; - ASTNode tableTree = (ASTNode)ast.getChild(0); - assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB; - ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0); - Table exportTable = getTargetTable(tokRefOrNameExportTable); - assert AcidUtils.isFullAcidTable(exportTable); - - //need to create the table "manually" rather than creating a task since it has to exist to - // compile the insert into T... - String newTableName = getTmptTableNameForExport(exportTable); //this is db.table - Map tblProps = new HashMap<>(); - tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); - String location; - - // for temporary tables we set the location to something in the session's scratch dir - // it has the same life cycle as the tmp table - try { - // Generate a unique ID for temp table path. - // This path will be fixed for the life of the temp table. - Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString()); - path = Warehouse.getDnsPath(path, conf); - location = path.toString(); - } catch (MetaException err) { - throw new SemanticException("Error while generating temp table path:", err); - } - - CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, - false, true, null, - null, location, null, null, - tblProps, - true, //important so we get an exception on name collision - Warehouse.getQualifiedName(exportTable.getTTable()), false); - Table newTable; - try { - ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName())); - inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security... - DDLTask createTableTask = (DDLTask) TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf); - createTableTask.setConf(conf); //above get() doesn't set it - createTableTask.execute(new DriverContext(new Context(conf))); - newTable = db.getTable(newTableName); - } catch(IOException|HiveException ex) { - throw new SemanticException(ex); - } - - //now generate insert statement - //insert into newTableName select * from ts - StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(), - tokRefOrNameExportTable, tableTree, newTableName); - ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); - Context rewrittenCtx = rr.rewrittenCtx; - rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery() - ASTNode rewrittenTree = rr.rewrittenTree; - try { - useSuper = true; - //newTable has to exist at this point to compile - super.analyze(rewrittenTree, rewrittenCtx); - } finally { - useSuper = false; - } - //now we have the rootTasks set up for Insert ... Select - removeStatsTasks(rootTasks); - //now make an ExportTask from temp table - /*analyzeExport() creates TableSpec which in turn tries to build - "public List partitions" by looking in the metastore to find Partitions matching - the partition spec in the Export command. These of course don't exist yet since we've not - ran the insert stmt yet!!!!!!! - */ - Task exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName, - db, conf, inputs, outputs); - - AlterTableDesc alterTblDesc = null; - { - /** - * add an alter table task to set transactional props - * do it after populating temp table so that it's written as non-transactional table but - * update props before export so that export archive metadata has these props. This way when - * IMPORT is done for this archive and target table doesn't exist, it will be created as Acid. - */ - alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); - HashMap mapProps = new HashMap<>(); - mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); - alterTblDesc.setProps(mapProps); - alterTblDesc.setOldName(newTableName); - } - addExportTask(rootTasks, exportTask, TaskFactory.get( - new DDLWork(getInputs(), getOutputs(), alterTblDesc))); - - { - /** - * Now make a task to drop temp table - * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType) - */ - ReplicationSpec replicationSpec = new ReplicationSpec(); - DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE, - false, true, replicationSpec); - Task dropTask = - TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf); - exportTask.addDependentTask(dropTask); - } - markReadEntityForUpdate(); - if(ctx.isExplainPlan()) { - try { - //so that "explain" doesn't "leak" tmp tables - // TODO: catalog - db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true); - } catch(HiveException ex) { - LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex); - } - } - } - /** - * generate - * insert into newTableName select * from ts - * for EXPORT command - */ - private StringBuilder generateExportQuery(List partCols, - ASTNode tokRefOrNameExportTable, ASTNode tableTree, String newTableName) - throws SemanticException { - StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName); - addPartitionColsToInsert(partCols, rewrittenQueryStr); - rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable)); - //builds partition spec so we can build suitable WHERE clause - TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true); - if(exportTableSpec.getPartSpec() != null) { - StringBuilder whereClause = null; - int partColsIdx = -1; //keep track of corresponding col in partCols - for(Map.Entry ent : exportTableSpec.getPartSpec().entrySet()) { - partColsIdx++; - if(ent.getValue() == null) { - continue; //partial spec - } - if(whereClause == null) { - whereClause = new StringBuilder(" WHERE "); - } - if(whereClause.length() > " WHERE ".length()) { - whereClause.append(" AND "); - } - whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf)) - .append(" = ").append(genPartValueString(partCols.get(partColsIdx).getType(), ent.getValue())); - } - if(whereClause != null) { - rewrittenQueryStr.append(whereClause); - } - } - return rewrittenQueryStr; - } - /** - * Makes the exportTask run after all other tasks of the "insert into T ..." are done. - */ - private void addExportTask(List> rootTasks, - Task exportTask, Task alterTable) { - for(Task t : rootTasks) { - if(t.getNumChild() <= 0) { - //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978 - t.addDependentTask(alterTable); - //this is a leaf so add exportTask to follow it - alterTable.addDependentTask(exportTask); - } else { - addExportTask(t.getDependentTasks(), exportTask, alterTable); - } - } - } - - private List> findStatsTasks( - List> rootTasks, List> statsTasks) { - for(Task t : rootTasks) { - if (t instanceof StatsTask) { - if(statsTasks == null) { - statsTasks = new ArrayList<>(); - } - statsTasks.add(t); - } - if(t.getDependentTasks() != null) { - statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks); - } + protected void analyze(ASTNode tree) throws SemanticException { + switch (tree.getToken().getType()) { + case HiveParser.TOK_DELETE_FROM: + analyzeDelete(tree); + break; + case HiveParser.TOK_UPDATE_TABLE: + analyzeUpdate(tree); + break; + default: + throw new RuntimeException("Asked to parse token " + tree.getName() + " in " + + "UpdateDeleteSemanticAnalyzer"); } - return statsTasks; } - private void removeStatsTasks(List> rootTasks) { - List> statsTasks = findStatsTasks(rootTasks, null); - if(statsTasks == null) { - return; - } - for (Task statsTask : statsTasks) { - if(statsTask.getParentTasks() == null) { - continue; //should never happen - } - for (Task t : new ArrayList<>(statsTask.getParentTasks())) { - t.removeDependentTask(statsTask); - } - } - } private void analyzeUpdate(ASTNode tree) throws SemanticException { - currentOperation = Context.Operation.UPDATE; + operation = Context.Operation.UPDATE; reparseAndSuperAnalyze(tree); } private void analyzeDelete(ASTNode tree) throws SemanticException { - currentOperation = Context.Operation.DELETE; + operation = Context.Operation.DELETE; reparseAndSuperAnalyze(tree); } /** - * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2 - * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2... - * @param target target table - */ - private void addPartitionColsToSelect(List partCols, StringBuilder rewrittenQueryStr, - ASTNode target) throws SemanticException { - String targetName = target != null ? getSimpleTableName(target) : null; - - // If the table is partitioned, we need to select the partition columns as well. - if (partCols != null) { - for (FieldSchema fschema : partCols) { - rewrittenQueryStr.append(", "); - //would be nice if there was a way to determine if quotes are needed - if(targetName != null) { - rewrittenQueryStr.append(targetName).append('.'); - } - rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf)); - } - } - } - /** - * Assert that we are not asked to update a bucketing column or partition column - * @param colName it's the A in "SET A = B" - */ - private void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException { - String columnName = normalizeColName(colName.getText()); - - // Make sure this isn't one of the partitioning columns, that's not supported. - for (FieldSchema fschema : targetTable.getPartCols()) { - if (fschema.getName().equalsIgnoreCase(columnName)) { - throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg()); - } - } - //updating bucket column should move row from one file to another - not supported - if(targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) { - throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName); - } - boolean foundColumnInTargetTable = false; - for(FieldSchema col : targetTable.getCols()) { - if(columnName.equalsIgnoreCase(col.getName())) { - foundColumnInTargetTable = true; - break; - } - } - if(!foundColumnInTargetTable) { - throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(), - targetTable.getFullyQualifiedName()); - } - } - private ASTNode findLHSofAssignment(ASTNode assignment) { - assert assignment.getToken().getType() == HiveParser.EQUAL : - "Expected set assignments to use equals operator but found " + assignment.getName(); - ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0); - assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL : - "Expected left side of assignment to be table or column"; - ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0); - assert colName.getToken().getType() == HiveParser.Identifier : - "Expected column name"; - return colName; - } - private Map collectSetColumnsAndExpressions(ASTNode setClause, - Set setRCols, Table targetTable) throws SemanticException { - // An update needs to select all of the columns, as we rewrite the entire row. Also, - // we need to figure out which columns we are going to replace. - assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE : - "Expected second child of update token to be set token"; - - // Get the children of the set clause, each of which should be a column assignment - List assignments = setClause.getChildren(); - // Must be deterministic order map for consistent q-test output across Java versions - Map setCols = new LinkedHashMap(assignments.size()); - for (Node a : assignments) { - ASTNode assignment = (ASTNode)a; - ASTNode colName = findLHSofAssignment(assignment); - if(setRCols != null) { - addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols); - } - checkValidSetClauseTarget(colName, targetTable); - - String columnName = normalizeColName(colName.getText()); - // This means that in UPDATE T SET x = _something_ - // _something_ can be whatever is supported in SELECT _something_ - setCols.put(columnName, (ASTNode)assignment.getChildren().get(1)); - } - return setCols; - } - /** - * @return the Metastore representation of the target table - */ - private Table getTargetTable(ASTNode tabRef) throws SemanticException { - return getTable(tabRef, db, true); - } - /** - * @param throwException if false, return null if table doesn't exist, else throw - */ - private static Table getTable(ASTNode tabRef, Hive db, boolean throwException) - throws SemanticException { - String[] tableName; - Table mTable; - switch (tabRef.getType()) { - case HiveParser.TOK_TABREF: - tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0)); - break; - case HiveParser.TOK_TABNAME: - tableName = getQualifiedTableName(tabRef); - break; - default: - throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef); - } - try { - mTable = db.getTable(tableName[0], tableName[1], throwException); - } catch (InvalidTableException e) { - LOG.error("Failed to find table " + getDotName(tableName) + " got exception " - + e.getMessage()); - throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e); - } catch (HiveException e) { - LOG.error("Failed to find table " + getDotName(tableName) + " got exception " - + e.getMessage()); - throw new SemanticException(e.getMessage(), e); - } - return mTable; - } - // Walk through all our inputs and set them to note that this read is part of an update or a - // delete. - private void markReadEntityForUpdate() { - for (ReadEntity input : inputs) { - if(isWritten(input)) { - //todo: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock - //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity - //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries) - //so DbTxnManager skips Read lock on the ReadEntity.... - input.setUpdateOrDelete(true);//input.noLockNeeded()? - } - } - } - /** - * For updates, we need to set the column access info so that it contains information on - * the columns we are updating. - * (But not all the columns of the target table even though the rewritten query writes - * all columns of target table since that is an implmentation detail) - */ - private void setUpAccessControlInfoForUpdate(Table mTable, Map setCols) { - ColumnAccessInfo cai = new ColumnAccessInfo(); - for (String colName : setCols.keySet()) { - cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName); - } - setUpdateColumnAccessInfo(cai); - } - /** - * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to - * require the user to have authorization on that column. - */ - private void cleanUpMetaColumnAccessControl() { - //we do this for Update/Delete (incl Merge) because we introduce this column into the query - //as part of rewrite - if (columnAccessInfo != null) { - columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID); - } - } - /** - * Parse the newly generated SQL statement to get a new AST - */ - private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) throws SemanticException { - // Set dynamic partitioning to nonstrict so that queries do not need any partition - // references. - // todo: this may be a perf issue as it prevents the optimizer.. or not - HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); - // Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly. - HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false); - // Parse the rewritten query string - Context rewrittenCtx; - try { - rewrittenCtx = new Context(conf); - rewrittenCtx.setHDFSCleanup(true); - // We keep track of all the contexts that are created by this query - // so we can clear them when we finish execution - ctx.addRewrittenStatementContext(rewrittenCtx); - } catch (IOException e) { - throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg()); - } - rewrittenCtx.setExplainConfig(ctx.getExplainConfig()); - rewrittenCtx.setExplainPlan(ctx.isExplainPlan()); - rewrittenCtx.setStatsSource(ctx.getStatsSource()); - rewrittenCtx.setPlanMapper(ctx.getPlanMapper()); - rewrittenCtx.setIsUpdateDeleteMerge(true); - rewrittenCtx.setCmd(rewrittenQueryStr.toString()); - - ASTNode rewrittenTree; - try { - LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">"); - rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx); - } catch (ParseException e) { - throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e); - } - return new ReparseResult(rewrittenTree, rewrittenCtx); - } - /** - * Assert it supports Acid write - */ - private void validateTargetTable(Table mTable) throws SemanticException { - if (mTable.getTableType() == TableType.VIRTUAL_VIEW || - mTable.getTableType() == TableType.MATERIALIZED_VIEW) { - LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view"); - throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg()); - } - } - /** * This supports update and delete statements + * Rewrite the delete or update into an insert. Crazy, but it works as deletes and update + * actually are inserts into the delta file in Hive. A delete + * DELETE FROM _tablename_ [WHERE ...] + * will be rewritten as + * INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[, + * _partcols_] from _tablename_ SORT BY ROW__ID + * An update + * UPDATE _tablename_ SET x = _expr_ [WHERE...] + * will be rewritten as + * INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_, + * _partcols_from _tablename_ SORT BY ROW__ID + * where _all_ is all the non-partition columns. The expressions from the set clause will be + * re-attached later. + * The where clause will also be re-attached later. + * The sort by clause is put in there so that records come out in the right order to enable + * merge on read. */ private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException { List children = tree.getChildren(); - // The first child should be the table we are deleting from + + // The first child should be the table we are updating / deleting from ASTNode tabName = (ASTNode)children.get(0); assert tabName.getToken().getType() == HiveParser.TOK_TABNAME : - "Expected tablename as first child of " + operation() + " but found " + tabName.getName(); - - // Rewrite the delete or update into an insert. Crazy, but it works as deletes and update - // actually are inserts into the delta file in Hive. A delete - // DELETE FROM _tablename_ [WHERE ...] - // will be rewritten as - // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT ROW__ID[, - // _partcols_] from _tablename_ SORT BY ROW__ID - // An update - // UPDATE _tablename_ SET x = _expr_ [WHERE...] - // will be rewritten as - // INSERT INTO TABLE _tablename_ [PARTITION (_partcols_)] SELECT _all_, - // _partcols_from _tablename_ SORT BY ROW__ID - // where _all_ is all the non-partition columns. The expressions from the set clause will be - // re-attached later. - // The where clause will also be re-attached later. - // The sort by clause is put in there so that records come out in the right order to enable - // merge on read. - - StringBuilder rewrittenQueryStr = new StringBuilder(); + "Expected tablename as first child of " + operation + " but found " + tabName.getName(); Table mTable = getTargetTable(tabName); validateTargetTable(mTable); + StringBuilder rewrittenQueryStr = new StringBuilder(); rewrittenQueryStr.append("insert into table "); rewrittenQueryStr.append(getFullTableNameForSQL(tabName)); - addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr); rewrittenQueryStr.append(" select ROW__ID"); @@ -669,11 +155,10 @@ private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException { assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT : "Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName(); - if(updating()) { + if (updating()) { rewrittenCtx.setOperation(Context.Operation.UPDATE); rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.UPDATE); - } - else if(deleting()) { + } else if (deleting()) { rewrittenCtx.setOperation(Context.Operation.DELETE); rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.DELETE); } @@ -731,842 +216,18 @@ else if(deleting()) { // Add the setRCols to the input list for (String colName : setRCols) { - if(columnAccessInfo != null) {//assuming this means we are not doing Auth + if (columnAccessInfo != null) { //assuming this means we are not doing Auth columnAccessInfo.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), - colName); - } - } - } - } - /** - * Check that {@code readEntity} is also being written - */ - private boolean isWritten(Entity readEntity) { - for(Entity writeEntity : outputs) { - //make sure to compare them as Entity, i.e. that it's the same table or partition, etc - if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) { - return true; - } - } - return false; - } - private String operation() { - if (currentOperation == Context.Operation.OTHER) { - throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " + - "deleting, operation not known."); - } - return currentOperation.toString(); - } - - // This method finds any columns on the right side of a set statement (thus rcols) and puts them - // in a set so we can add them to the list of input cols to check. - private void addSetRCols(ASTNode node, Set setRCols) { - - // See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If - // not, recurse on any children - if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) { - ASTNode colName = (ASTNode)node.getChildren().get(0); - assert colName.getToken().getType() == HiveParser.Identifier : - "Expected column name"; - setRCols.add(normalizeColName(colName.getText())); - } else if (node.getChildren() != null) { - for (Node n : node.getChildren()) { - addSetRCols((ASTNode)n, setRCols); - } - } - } - - /** - * Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement. - * Unfortunately there is no single place that normalizes the input query. - * @param colName not null - */ - private static String normalizeColName(String colName) { - return colName.toLowerCase(); - } - - private Context.Operation currentOperation = Context.Operation.OTHER; - private static final String Indent = " "; - - private IdentifierQuoter quotedIdenfierHelper; - - /** - * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it. - * Since HiveLexer.g is written such that it strips away any ` (back ticks) around - * quoted identifiers we need to add those back to generated SQL. - * Additionally, the parser only produces tokens of type Identifier and never - * QuotedIdentifier (HIVE-6013). So here we just quote all identifiers. - * (') around String literals are retained w/o issues - */ - private static class IdentifierQuoter { - private final TokenRewriteStream trs; - private final IdentityHashMap visitedNodes = new IdentityHashMap<>(); - IdentifierQuoter(TokenRewriteStream trs) { - this.trs = trs; - if(trs == null) { - throw new IllegalArgumentException("Must have a TokenRewriteStream"); - } - } - private void visit(ASTNode n) { - if(n.getType() == HiveParser.Identifier) { - if(visitedNodes.containsKey(n)) { - /** - * Since we are modifying the stream, it's not idempotent. Ideally, the caller would take - * care to only quote Identifiers in each subtree once, but this makes it safe - */ - return; + colName); } - visitedNodes.put(n, n); - trs.insertBefore(n.getToken(), "`"); - trs.insertAfter(n.getToken(), "`"); - } - if(n.getChildCount() <= 0) {return;} - for(Node c : n.getChildren()) { - visit((ASTNode)c); } } } - /** - * This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without - * needing to understand what it is (except for QuotedIdentifiers) - * - */ - private String getMatchedText(ASTNode n) { - quotedIdenfierHelper.visit(n); - return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(), - n.getTokenStopIndex() + 1).trim(); - } - /** - * Here we take a Merge statement AST and generate a semantically equivalent multi-insert - * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible, - * the new SQL statement is made to look like the input SQL statement so that it's easier to map - * Query Compiler errors from generated SQL to original one this way. - * The generated SQL is a complete representation of the original input for the same reason. - * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates. - * If generated SQL doesn't have everything and is patched up later, these coordinates point to - * the wrong place. - * - * @throws SemanticException - */ - private void analyzeMerge(ASTNode tree) throws SemanticException { - currentOperation = Context.Operation.MERGE; - quotedIdenfierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream()); - /* - * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST - For example, given: - merge into acidTbl using nonAcidPart2 source ON acidTbl.a = source.a2 - WHEN MATCHED THEN UPDATE set b = source.b2 - WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) - - We get AST like this: - "(tok_merge " + - "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " + - "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " + - "(tok_matched " + - "(tok_update " + - "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " + - "(tok_not_matched " + - "tok_insert " + - "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))"); - - And need to produce a multi-insert like this to execute: - FROM acidTbl right outer join nonAcidPart2 ON acidTbl.a = source.a2 - Insert into table acidTbl select nonAcidPart2.a2, nonAcidPart2.b2 where acidTbl.a is null - INSERT INTO TABLE acidTbl select target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 where nonAcidPart2.a2=acidTbl.a sort by acidTbl.ROW__ID - */ - /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if - original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but - the errors will point at locations that the user can't map to anything - - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select - todo: do we care to preserve comments in original SQL? - todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent - Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse... - todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens when source is empty? This should be a runtime error - maybe not - the outer side of ROJ is empty => the join produces 0 rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error - */ - ASTNode target = (ASTNode)tree.getChild(0); - ASTNode source = (ASTNode)tree.getChild(1); - String targetName = getSimpleTableName(target); - String sourceName = getSimpleTableName(source); - ASTNode onClause = (ASTNode) tree.getChild(2); - String onClauseAsText = getMatchedText(onClause); - - int whenClauseBegins = 3; - boolean hasHint = false; - // query hint - ASTNode qHint = (ASTNode) tree.getChild(3); - if (qHint.getType() == HiveParser.QUERY_HINT) { - hasHint = true; - whenClauseBegins++; - } - Table targetTable = getTargetTable(target); - validateTargetTable(targetTable); - List whenClauses = findWhenClauses(tree, whenClauseBegins); - - StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n"); - - rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(target)); - if(isAliased(target)) { - rewrittenQueryStr.append(" ").append(targetName); - } - rewrittenQueryStr.append('\n'); - rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n"); - if(source.getType() == HiveParser.TOK_SUBQUERY) { - //this includes the mandatory alias - rewrittenQueryStr.append(Indent).append(getMatchedText(source)); - } - else { - rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source)); - if(isAliased(source)) { - rewrittenQueryStr.append(" ").append(sourceName); - } - } - rewrittenQueryStr.append('\n'); - rewrittenQueryStr.append(Indent).append("ON ").append(onClauseAsText).append('\n'); - - // Add the hint if any - String hintStr = null; - if (hasHint) { - hintStr = " /*+ " + qHint.getText() + " */ "; - } - - /** - * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete - * If we have both update and delete, the 1st one (in SQL code) must have "AND " - * so that the 2nd can ensure not to process the same rows. - * Update and Delete may be in any order. (Insert is always last) - */ - String extraPredicate = null; - int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0; - int numInsertClauses = 0; - boolean hintProcessed = false; - for(ASTNode whenClause : whenClauses) { - switch (getWhenClauseOperation(whenClause).getType()) { - case HiveParser.TOK_INSERT: - numInsertClauses++; - handleInsert(whenClause, rewrittenQueryStr, target, onClause, - targetTable, targetName, onClauseAsText, hintProcessed ? null : hintStr); - hintProcessed = true; - break; - case HiveParser.TOK_UPDATE: - numWhenMatchedUpdateClauses++; - String s = handleUpdate(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); - hintProcessed = true; - if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { - extraPredicate = s;//i.e. it's the 1st WHEN MATCHED - } - break; - case HiveParser.TOK_DELETE: - numWhenMatchedDeleteClauses++; - String s1 = handleDelete(whenClause, rewrittenQueryStr, target, - onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr); - hintProcessed = true; - if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { - extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED - } - break; - default: - throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() + - addParseInfo(whenClause)); - } - if(numWhenMatchedDeleteClauses > 1) { - throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd()); - } - if(numWhenMatchedUpdateClauses > 1) { - throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd()); - } - assert numInsertClauses < 2: "too many Insert clauses"; - } - if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { - throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); - } - - boolean validating = handleCardinalityViolation(rewrittenQueryStr, target, onClauseAsText, - targetTable, numWhenMatchedDeleteClauses == 0 && numWhenMatchedUpdateClauses == 0); - ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd()); - Context rewrittenCtx = rr.rewrittenCtx; - ASTNode rewrittenTree = rr.rewrittenTree; - rewrittenCtx.setOperation(Context.Operation.MERGE); - - //set dest name mapping on new context; 1st chid is TOK_FROM - for(int insClauseIdx = 1, whenClauseIdx = 0; - insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/); - insClauseIdx++, whenClauseIdx++) { - //we've added Insert clauses in order or WHEN items in whenClauses - switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) { - case HiveParser.TOK_INSERT: - rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT); - break; - case HiveParser.TOK_UPDATE: - rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE); - break; - case HiveParser.TOK_DELETE: - rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE); - break; - default: - assert false; - } - } - if(validating) { - //here means the last branch of the multi-insert is Cardinality Validation - rewrittenCtx.addDestNamePrefix(rewrittenTree.getChildCount() - 1, Context.DestClausePrefix.INSERT); - } - - try { - useSuper = true; - super.analyze(rewrittenTree, rewrittenCtx); - } finally { - useSuper = false; - } - updateOutputs(targetTable); - } - - /** - * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check - * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager - * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we - * have change the table WriteEntity to a set of partition WriteEntity objects based on - * ReadEntity objects computed for this table. - */ - private void updateOutputs(Table targetTable) { - markReadEntityForUpdate(); - - if(targetTable.isPartitioned()) { - List partitionsRead = getRestrictedPartitionSet(targetTable); - if(!partitionsRead.isEmpty()) { - //if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with - //WriteEntity for each partition - List toRemove = new ArrayList<>(); - for(WriteEntity we : outputs) { - WriteEntity.WriteType wt = we.getWriteType(); - if(isTargetTable(we, targetTable) && - (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) { - /** - * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each - * partition that exists and is matched by the WHERE clause (which may be all of them). - * Since we don't allow updating the value of a partition column, we know that we always - * write the same (or fewer) partitions than we read. Still, the write is a Dynamic - * Partition write - see HIVE-15032. - */ - toRemove.add(we); - } - } - outputs.removeAll(toRemove); - // TODO: why is this like that? - for(ReadEntity re : partitionsRead) { - for(WriteEntity original : toRemove) { - //since we may have both Update and Delete branches, Auth needs to know - WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType()); - we.setDynamicPartitionWrite(original.isDynamicPartitionWrite()); - outputs.add(we); - } - } - } - } - } - /** - * If the optimizer has determined that it only has to read some of the partitions of the - * target table to satisfy the query, then we know that the write side of update/delete - * (and update/delete parts of merge) - * can only write (at most) that set of partitions (since we currently don't allow updating - * partition (or bucket) columns). So we want to replace the table level - * WriteEntity in the outputs with WriteEntity for each of these partitions - * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any - * insert which does a select against the same table. Then SemanticAnalyzer would also - * be able to not use DP for the Insert... - * - * Note that the Insert of Merge may be creating new partitions and writing to partitions - * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created - * in MoveTask (or some other task after the query is complete) - */ - private List getRestrictedPartitionSet(Table targetTable) { - List partitionsRead = new ArrayList<>(); - for(ReadEntity re : inputs) { - if(re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) { - partitionsRead.add(re); - } - } - return partitionsRead; - } - /** - * if there is no WHEN NOT MATCHED THEN INSERT, we don't outer join - */ - private String chooseJoinType(List whenClauses) { - for(ASTNode whenClause : whenClauses) { - if(getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) { - return "RIGHT OUTER JOIN"; - } - } - return "INNER JOIN"; - } - /** - * does this Entity belong to target table (partition) - */ - private boolean isTargetTable(Entity entity, Table targetTable) { - //todo: https://issues.apache.org/jira/browse/HIVE-15048 - /** - * is this the right way to compare? Should it just compare paths? - * equals() impl looks heavy weight - */ - return targetTable.equals(entity.getTable()); - } - - /** - * Per SQL Spec ISO/IEC 9075-2:2011(E) Section 14.2 under "General Rules" Item 6/Subitem a/Subitem 2/Subitem B, - * an error should be raised if > 1 row of "source" matches the same row in "target". - * This should not affect the runtime of the query as it's running in parallel with other - * branches of the multi-insert. It won't actually write any data to merge_tmp_table since the - * cardinality_violation() UDF throws an error whenever it's called killing the query - * @return true if another Insert clause was added - */ - private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, - boolean onlyHaveWhenNotMatchedClause) - throws SemanticException { - if(!conf.getBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK)) { - LOG.info("Merge statement cardinality violation check is disabled: " + - HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK.varname); - return false; - } - if(onlyHaveWhenNotMatchedClause) { - //if no update or delete in Merge, there is no need to to do cardinality check - return false; - } - //this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a - // given session, i.e. the name can be fixed across all invocations - String tableName = "merge_tmp_table"; - rewrittenQueryStr.append("\nINSERT INTO ").append(tableName) - .append("\n SELECT cardinality_violation(") - .append(getSimpleTableName(target)).append(".ROW__ID"); - addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); - - rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString) - .append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID"); - - addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); - - rewrittenQueryStr.append(" HAVING count(*) > 1"); - //say table T has partition p, we are generating - //select cardinality_violation(ROW_ID, p) WHERE ... GROUP BY ROW__ID, p - //the Group By args are passed to cardinality_violation to add the violating value to the error msg - try { - if (null == db.getTable(tableName, false)) { - StorageFormat format = new StorageFormat(conf); - format.processStorageFormat("TextFile"); - Table table = db.newTable(tableName); - table.setSerializationLib(format.getSerde()); - List fields = new ArrayList(); - fields.add(new FieldSchema("val", "int", null)); - table.setFields(fields); - table.setDataLocation(Warehouse.getDnsPath(new Path(SessionState.get().getTempTableSpace(), - tableName), conf)); - table.getTTable().setTemporary(true); - table.setStoredAsSubDirectories(false); - table.setInputFormatClass(format.getInputFormat()); - table.setOutputFormatClass(format.getOutputFormat()); - db.createTable(table, true); - } - } - catch(HiveException|MetaException e) { - throw new SemanticException(e.getMessage(), e); - } - return true; - } - /** - * @param onClauseAsString - because there is no clone() and we need to use in multiple places - * @param deleteExtraPredicate - see notes at caller - */ - private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, - ASTNode target, String onClauseAsString, Table targetTable, - String deleteExtraPredicate, String hintStr) throws SemanticException { - assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED; - assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE; - String targetName = getSimpleTableName(target); - rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); - addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); - rewrittenQueryStr.append(" -- update clause\n SELECT "); - if (hintStr != null) { - rewrittenQueryStr.append(hintStr); - } - rewrittenQueryStr.append(targetName).append(".ROW__ID"); - - ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0); - //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions - //before reparsing, i.e. they are known to SemanticAnalyzer logic - Map setColsExprs = collectSetColumnsAndExpressions(setClause, null, targetTable); - //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end up with - //insert into target (p1) select current_date(), 5, c3, p1 where .... - //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table names - List nonPartCols = targetTable.getCols(); - for(FieldSchema fs : nonPartCols) { - rewrittenQueryStr.append(", "); - String name = fs.getName(); - if (setColsExprs.containsKey(name)) { - String rhsExp = getMatchedText(setColsExprs.get(name)); - //"set a=5, b=8" - rhsExp picks up the next char (e.g. ',') from the token stream - switch (rhsExp.charAt(rhsExp.length() - 1)) { - case ',': - case '\n': - rhsExp = rhsExp.substring(0, rhsExp.length() - 1); - } - rewrittenQueryStr.append(rhsExp); - } - else { - rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf)); - } - } - addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target); - rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); - String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause); - if(extraPredicate != null) { - //we have WHEN MATCHED AND THEN DELETE - rewrittenQueryStr.append(" AND ").append(extraPredicate); - } - if(deleteExtraPredicate != null) { - rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")"); - } - rewrittenQueryStr.append("\n SORT BY "); - rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); - - setUpAccessControlInfoForUpdate(targetTable, setColsExprs); - //we don't deal with columns on RHS of SET expression since the whole expr is part of the - //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to - //figure which cols on RHS are from source and which from target - - return extraPredicate; - } - /** - * @param onClauseAsString - because there is no clone() and we need to use in multiple places - * @param updateExtraPredicate - see notes at caller - */ - private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target, - String onClauseAsString, Table targetTable, String updateExtraPredicate, String hintStr) throws SemanticException { - assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED; - assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE; - List partCols = targetTable.getPartCols(); - String targetName = getSimpleTableName(target); - rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); - addPartitionColsToInsert(partCols, rewrittenQueryStr); - - rewrittenQueryStr.append(" -- delete clause\n SELECT "); - if (hintStr != null) { - rewrittenQueryStr.append(hintStr); - } - rewrittenQueryStr.append(targetName).append(".ROW__ID "); - addPartitionColsToSelect(partCols, rewrittenQueryStr, target); - rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString); - String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause); - if(extraPredicate != null) { - //we have WHEN MATCHED AND THEN DELETE - rewrittenQueryStr.append(" AND ").append(extraPredicate); - } - if(updateExtraPredicate != null) { - rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")"); - } - rewrittenQueryStr.append("\n SORT BY "); - rewrittenQueryStr.append(targetName).append(".ROW__ID \n"); - return extraPredicate; - } - private static String addParseInfo(ASTNode n) { - return " at " + ErrorMsg.renderPosition(n); - } - - /** - * Returns the table name to use in the generated query preserving original quotes/escapes if any - * @see #getFullTableNameForSQL(ASTNode) - */ - private String getSimpleTableName(ASTNode n) throws SemanticException { - return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf); - } - private String getSimpleTableNameBase(ASTNode n) throws SemanticException { - switch (n.getType()) { - case HiveParser.TOK_TABREF: - int aliasIndex = findTabRefIdxs(n)[0]; - if (aliasIndex != 0) { - return n.getChild(aliasIndex).getText();//the alias - } - return getSimpleTableNameBase((ASTNode) n.getChild(0)); - case HiveParser.TOK_TABNAME: - if(n.getChildCount() == 2) { - //db.table -> return table - return n.getChild(1).getText(); - } - return n.getChild(0).getText(); - case HiveParser.TOK_SUBQUERY: - return n.getChild(1).getText();//the alias - default: - throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n); - } - } - - private static final class ReparseResult { - private final ASTNode rewrittenTree; - private final Context rewrittenCtx; - ReparseResult(ASTNode n, Context c) { - rewrittenTree = n; - rewrittenCtx = c; - } - } - - private boolean isAliased(ASTNode n) { - switch (n.getType()) { - case HiveParser.TOK_TABREF: - return findTabRefIdxs(n)[0] != 0; - case HiveParser.TOK_TABNAME: - return false; - case HiveParser.TOK_SUBQUERY: - assert n.getChildCount() > 1 : "Expected Derived Table to be aliased"; - return true; - default: - throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n); - } - } - /** - * Collect WHEN clauses from Merge statement AST - */ - private List findWhenClauses(ASTNode tree, int start) throws SemanticException { - assert tree.getType() == HiveParser.TOK_MERGE; - List whenClauses = new ArrayList<>(); - for(int idx = start; idx < tree.getChildCount(); idx++) { - ASTNode whenClause = (ASTNode)tree.getChild(idx); - assert whenClause.getType() == HiveParser.TOK_MATCHED || - whenClause.getType() == HiveParser.TOK_NOT_MATCHED : - "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause); - whenClauses.add(whenClause); - } - if(whenClauses.size() <= 0) { - //Futureproofing: the parser will actually not allow this - throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement"); - } - return whenClauses; - } - private ASTNode getWhenClauseOperation(ASTNode whenClause) { - if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) { - throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause); - } - return (ASTNode) whenClause.getChild(0); - } - /** - * returns the as in WHEN MATCHED AND THEN... - * @return may be null - */ - private String getWhenClausePredicate(ASTNode whenClause) { - if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) { - throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause); - } - if(whenClause.getChildCount() == 2) { - return getMatchedText((ASTNode)whenClause.getChild(1)); - } - return null; - } - /** - * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause - * @param targetTableNameInSourceQuery - simple name/alias - * @throws SemanticException - */ - private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target, - ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, - String onClauseAsString, String hintStr) throws SemanticException { - ASTNode whenClauseOperation = getWhenClauseOperation(whenNotMatchedClause); - assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED; - assert whenClauseOperation.getType() == HiveParser.TOK_INSERT; - - // identify the node that contains the values to insert and the optional column list node - ArrayList children = whenClauseOperation.getChildren(); - ASTNode valuesNode = - (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_FUNCTION).findFirst().get(); - ASTNode columnListNode = - (ASTNode)children.stream().filter(n -> ((ASTNode)n).getType() == HiveParser.TOK_TABCOLNAME).findFirst() - .orElse(null); - - // if column list is specified, then it has to have the same number of elements as the values - // valuesNode has a child for struct, the rest are the columns - if (columnListNode != null && columnListNode.getChildCount() != (valuesNode.getChildCount() - 1)) { - throw new SemanticException(String.format("Column schema must have the same length as values (%d vs %d)", - columnListNode.getChildCount(), valuesNode.getChildCount() - 1)); - } - - rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target)); - if (columnListNode != null) { - rewrittenQueryStr.append(' ').append(getMatchedText(columnListNode)); - } - addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr); - - rewrittenQueryStr.append(" -- insert clause\n SELECT "); - if (hintStr != null) { - rewrittenQueryStr.append(hintStr); - } - - OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery, - conf, onClauseAsString); - oca.analyze(); - - String valuesClause = getMatchedText(valuesNode); - valuesClause = valuesClause.substring(1, valuesClause.length() - 1);//strip '(' and ')' - valuesClause = replaceDefaultKeywordForMerge(valuesClause, targetTable, columnListNode); - rewrittenQueryStr.append(valuesClause).append("\n WHERE ").append(oca.getPredicate()); - - String extraPredicate = getWhenClausePredicate(whenNotMatchedClause); - if (extraPredicate != null) { - //we have WHEN NOT MATCHED AND THEN INSERT - rewrittenQueryStr.append(" AND ") - .append(getMatchedText(((ASTNode)whenNotMatchedClause.getChild(1)))).append('\n'); - } - } - - private String replaceDefaultKeywordForMerge(String valueClause, Table table, ASTNode columnListNode) - throws SemanticException { - if (!valueClause.toLowerCase().contains("`default`")) { - return valueClause; - } - - Map colNameToDefaultConstraint = getColNameToDefaultValueMap(table); - String[] values = valueClause.trim().split(","); - String[] replacedValues = new String[values.length]; - - // the list of the column names may be set in the query - String[] columnNames = columnListNode == null ? - table.getAllCols().stream().map(f -> f.getName()).toArray(size -> new String[size]) : - columnListNode.getChildren().stream().map(n -> ((ASTNode)n).toString()).toArray(size -> new String[size]); - - for (int i = 0; i < values.length; i++) { - if (values[i].trim().toLowerCase().equals("`default`")) { - replacedValues[i] = MapUtils.getString(colNameToDefaultConstraint, columnNames[i], "null"); - } else { - replacedValues[i] = values[i]; - } - } - return StringUtils.join(replacedValues, ','); + private boolean updating() { + return operation == Context.Operation.UPDATE; } - - /** - * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from - * target table and 'd' is from source expression. In order to properly - * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where - * clause of this Insert contains "target.a is null and target.c is null" This ensures that this - * Insert leg does not receive any rows that are processed by Insert corresponding to - * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an - * unqualified column is part of the target table. We can get away with this simple logic because - * we know that target is always a table (as opposed to some derived table). - * The job of this class is to generate this predicate. - * - * Note that is this predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates - * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown, - * and so it will be False for WHEN NOT MATCHED Insert... - */ - private static final class OnClauseAnalyzer { - private final ASTNode onClause; - private final Map> table2column = new HashMap<>(); - private final List unresolvedColumns = new ArrayList<>(); - private final List allTargetTableColumns = new ArrayList<>(); - private final Set tableNamesFound = new HashSet<>(); - private final String targetTableNameInSourceQuery; - private final HiveConf conf; - private final String onClauseAsString; - /** - * @param targetTableNameInSourceQuery alias or simple name - */ - OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery, - HiveConf conf, String onClauseAsString) { - this.onClause = onClause; - allTargetTableColumns.addAll(targetTable.getCols()); - allTargetTableColumns.addAll(targetTable.getPartCols()); - this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery); - this.conf = conf; - this.onClauseAsString = onClauseAsString; - } - /** - * finds all columns and groups by table ref (if there is one) - */ - private void visit(ASTNode n) { - if(n.getType() == HiveParser.TOK_TABLE_OR_COL) { - ASTNode parent = (ASTNode) n.getParent(); - if(parent != null && parent.getType() == HiveParser.DOT) { - //the ref must be a table, so look for column name as right child of DOT - if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) { - //I don't think this can happen... but just in case - throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClauseAsString); - } - addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText()); - } - else { - //must be just a column name - unresolvedColumns.add(n.getChild(0).getText()); - } - } - if(n.getChildCount() == 0) { - return; - } - for(Node child : n.getChildren()) { - visit((ASTNode)child); - } - } - private void analyze() { - visit(onClause); - if(tableNamesFound.size() > 2) { - throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " + - tableNamesFound + " in " + onClauseAsString); - } - handleUnresolvedColumns(); - if(tableNamesFound.size() > 2) { - throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " + - "Found " + tableNamesFound + " in " + onClauseAsString); - } - } - /** - * Find those that belong to target table - */ - private void handleUnresolvedColumns() { - if(unresolvedColumns.isEmpty()) { return; } - for(String c : unresolvedColumns) { - for(FieldSchema fs : allTargetTableColumns) { - if(c.equalsIgnoreCase(fs.getName())) { - //c belongs to target table; strictly speaking there maybe an ambiguous ref but - //this will be caught later when multi-insert is parsed - addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c); - break; - } - } - } - } - private void addColumn2Table(String tableName, String columnName) { - tableName = tableName.toLowerCase();//normalize name for mapping - tableNamesFound.add(tableName); - List cols = table2column.get(tableName); - if(cols == null) { - cols = new ArrayList<>(); - table2column.put(tableName, cols); - } - //we want to preserve 'columnName' as it was in original input query so that rewrite - //looks as much as possible like original query - cols.add(columnName); - } - /** - * Now generate the predicate for Where clause - */ - private String getPredicate() { - //normilize table name for mapping - List targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase()); - if(targetCols == null) { - /*e.g. ON source.t=1 - * this is not strictly speaking invalid but it does ensure that all columns from target - * table are all NULL for every row. This would make any WHEN MATCHED clause invalid since - * we don't have a ROW__ID. The WHEN NOT MATCHED could be meaningful but it's just data from - * source satisfying source.t=1... not worth the effort to support this*/ - throw new IllegalArgumentException(ErrorMsg.INVALID_TABLE_IN_ON_CLAUSE_OF_MERGE - .format(targetTableNameInSourceQuery, onClauseAsString)); - } - StringBuilder sb = new StringBuilder(); - for(String col : targetCols) { - if(sb.length() > 0) { - sb.append(" AND "); - } - //but preserve table name in SQL - sb.append(HiveUtils.unparseIdentifier(targetTableNameInSourceQuery, conf)).append(".").append(HiveUtils.unparseIdentifier(col, conf)).append(" IS NULL"); - } - return sb.toString(); - } + private boolean deleting() { + return operation == Context.Operation.DELETE; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index 423ca2a..93641af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -104,7 +104,7 @@ String newName; ArrayList newCols; String serdeName; - HashMap props; + Map props; String inputFormat; String outputFormat; String storageHandler; @@ -484,7 +484,7 @@ public void setSerdeName(String serdeName) { * @return the props */ @Explain(displayName = "properties") - public HashMap getProps() { + public Map getProps() { return props; } @@ -492,7 +492,7 @@ public void setSerdeName(String serdeName) { * @param props * the props to set */ - public void setProps(HashMap props) { + public void setProps(Map props) { this.props = props; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index d91569e..f9d545f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -102,7 +102,7 @@ public MmContext getMmContext() { * For exporting Acid table, change the "pointer" to the temp table. * This has to be done after the temp table is populated and all necessary Partition objects * exist in the metastore. - * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)} + * See {@link org.apache.hadoop.hive.ql.parse.AcidExportAnalyzer#isAcidExport(ASTNode)} * for more info. */ public void acidPostProcess(Hive db) throws HiveException {