diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 53da72b82b..f65d960953 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1510,6 +1510,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning_3.q,\ spark_dynamic_partition_pruning_4.q,\ spark_dynamic_partition_pruning_5.q,\ + spark_dynamic_partition_pruning_6.q,\ spark_dynamic_partition_pruning_mapjoin_only.q,\ spark_constprog_dpp.q,\ spark_dynamic_partition_pruning_recursive_mapjoin.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 5d2c759b32..aaf2c30e24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.exec; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -455,4 +457,19 @@ public static boolean isInBranch(SparkPartitionPruningSinkOperator op) { } return matchingOps; } + + public static Operator findOperatorById(Operator start, String opId) { + Deque> queue = new ArrayDeque<>(); + queue.add(start); + while (!queue.isEmpty()) { + Operator op = queue.remove(); + if (op.getOperatorId().equals(opId)) { + return op; + } + if (op.getChildOperators() != null) { + queue.addAll(op.getChildOperators()); + } + } + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java index d6f459bf7b..ed889fad2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -126,31 +125,27 @@ private void processFiles(MapWork work, JobConf jobConf) throws HiveException { LOG.info("Start processing pruning file: " + fstatus.getPath()); in = new ObjectInputStream(fs.open(fstatus.getPath())); final int numName = in.readInt(); - SourceInfo info = null; - Set columnNames = new HashSet<>(); for (int i = 0; i < numName; i++) { columnNames.add(in.readUTF()); } + + // make sure the dpp sink has output for all the corresponding part columns for (SourceInfo si : sourceInfoMap.get(name)) { - if (columnNames.contains(si.columnName)) { - info = si; - break; - } + Preconditions.checkArgument(columnNames.contains(si.columnName), + "AssertionError: no output for column " + si.columnName); } - Preconditions.checkArgument(info != null, - "AssertionError: no source info for the column: " + - Arrays.toString(columnNames.toArray())); - - // Read fields + // Read dpp outputs while (in.available() > 0) { writable.readFields(in); - Object row = info.deserializer.deserialize(writable); - Object value = info.soi.getStructFieldData(row, info.field); - value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector); - info.values.add(value); + for (SourceInfo info : sourceInfoMap.get(name)) { + Object row = info.deserializer.deserialize(writable); + Object value = info.soi.getStructFieldData(row, info.field); + value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector); + info.values.add(value); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index f332790a56..3e7e6fad19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Deque; import java.util.HashSet; +import java.util.List; import java.util.Set; import com.google.common.base.Preconditions; @@ -43,9 +44,12 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; @@ -129,7 +133,7 @@ public static SparkSession getSparkSession(HiveConf conf, // sessionConf and conf are different objects if (sessionConf.getSparkConfigUpdated() || conf.getSparkConfigUpdated()) { sparkSessionManager.closeSession(sparkSession); - sparkSession = null; + sparkSession = null; conf.setSparkConfigUpdated(false); sessionConf.setSparkConfigUpdated(false); } @@ -263,6 +267,51 @@ public static void removeEmptySparkTask(SparkTask currTask) { currTask.removeFromChildrenTasks(); } + // Merge the target works of the second DPP sink into the first DPP sink. + public static void combineEquivalentDPPSinks(SparkPartitionPruningSinkOperator first, + SparkPartitionPruningSinkOperator second) { + SparkPartitionPruningSinkDesc firstConf = first.getConf(); + SparkPartitionPruningSinkDesc secondConf = second.getConf(); + for (SparkPartitionPruningSinkDesc.DPPTargetInfo targetInfo : secondConf.getTargetInfos()) { + MapWork target = targetInfo.work; + firstConf.addTarget(targetInfo.columnName, targetInfo.columnType, targetInfo.partKey, target, + targetInfo.tableScan); + + if (target != null) { + // update the target map work of the second + first.addAsSourceEvent(target, targetInfo.partKey, targetInfo.columnName, + targetInfo.columnType); + second.removeFromSourceEvent(target, targetInfo.partKey, targetInfo.columnName, + targetInfo.columnType); + target.setTmpPathForPartitionPruning(firstConf.getTmpPathOfTargetWork()); + } + } + } + + // Find if there's any DPP sink branch of the branchingOP that is equivalent + // to the branch represented by the list. + public static SparkPartitionPruningSinkOperator findReusableDPPSink( + Operator branchingOP, List list) { + for (Operator other : branchingOP.getChildOperators()) { + int i; + for (i = 0; i < list.size(); i++) { + if (other == list.get(i) || !other.logicalEquals(list.get(i))) { + break; + } + if (i != list.size() - 1) { + if (other.getChildOperators() == null || other.getChildOperators().size() != 1) { + break; + } + other = other.getChildOperators().get(0); + } + } + if (i == list.size()) { + return (SparkPartitionPruningSinkOperator) other; + } + } + return null; + } + /** * For DPP sinks w/ common join, we'll split the tree and what's above the branching * operator is computed multiple times. Therefore it may not be good for performance to support @@ -326,7 +375,7 @@ public static void removeNestedDPP(OptimizeSparkProcContext procContext) { } // whether of pattern "SEL - GBY - DPP" - private static boolean isDirectDPPBranch(Operator op) { + public static boolean isDirectDPPBranch(Operator op) { if (op instanceof SelectOperator && op.getChildOperators() != null && op.getChildOperators().size() == 1) { op = op.getChildOperators().get(0); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index bb7f69c318..efe504cd73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -57,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; import org.apache.hadoop.hive.ql.parse.SemiJoinHint; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.DynamicValue; @@ -457,11 +460,17 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars } else { // Must be spark branch SparkPartitionPruningSinkDesc desc = new SparkPartitionPruningSinkDesc(); - desc.setTableScan(ts); desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(keyExprs, "key"))); - desc.addTarget(column, columnType, partKey, null); - OperatorFactory.getAndMakeChild(desc, groupByOp); + desc.addTarget(column, columnType, partKey, null, ts); + SparkPartitionPruningSinkOperator dppSink = (SparkPartitionPruningSinkOperator) + OperatorFactory.getAndMakeChild(desc, groupByOp); + SparkPartitionPruningSinkOperator reusableDPP = SparkUtilities.findReusableDPPSink(parentOfRS, + Arrays.asList(selectOp, groupByOp, dppSink)); + if (reusableDPP != null) { + SparkUtilities.combineEquivalentDPPSinks(reusableDPP, dppSink); + parentOfRS.removeChild(selectOp); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java index 3b722b33a0..a117a6573c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java @@ -58,14 +58,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (context.getConf().isSparkDPPOnlyMapjoin() && !op.isWithMapjoin()) { - LOG.info("Disabling dynamic partition pruning based on: " + desc.getTableScan().getName() + LOG.info("Disabling dynamic partition pruning based on: " + desc.getTableScanNames() + ". This is not part of a map join."); remove = true; } else if (desc.getStatistics().getDataSize() > context.getConf() .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { LOG.info("Disabling dynamic partition pruning based on: " - + desc.getTableScan().getName() + + desc.getTableScanNames() + ". Expected data size is too big: " + desc.getStatistics().getDataSize()); remove = true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java index caf5dcc6a7..124138361b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java @@ -20,9 +20,11 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc.DPPTargetInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -89,15 +92,26 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws // For each SparkPartitionPruningSinkOperator, take the target MapWork and see if it is in a dependent SparkTask for (Operator op : pruningSinkOps) { - SparkPartitionPruningSinkOperator pruningSinkOp = (SparkPartitionPruningSinkOperator) op; - MapWork targetMapWork = pruningSinkOp.getConf().getTargetMapWork(); - - // Check if the given SparkTask has a child SparkTask that contains the target MapWork - // If it does not, then remove the DPP op - if (!taskContainsDependentMapWork(task, targetMapWork)) { - LOG.info("Disabling DPP for source work " + baseWork.getName() + " for target work " - + targetMapWork.getName() + " as no dependency exists between the source and target work"); - removeSparkPartitionPruningSink(baseWork, targetMapWork, pruningSinkOp); + SparkPartitionPruningSinkOperator pruningSinkOp = + (SparkPartitionPruningSinkOperator) op; + SparkPartitionPruningSinkDesc desc = pruningSinkOp.getConf(); + List toRemove = new ArrayList<>(); + for (DPPTargetInfo targetInfo : desc.getTargetInfos()) { + MapWork targetMapWork = targetInfo.work; + // Check if the given SparkTask has a child SparkTask that contains the target MapWork + // If it does not, then remove the target from DPP op + if (!taskContainsDependentMapWork(task, targetMapWork)) { + toRemove.add(targetInfo); + pruningSinkOp.removeFromSourceEvent(targetMapWork, targetInfo.partKey, + targetInfo.columnName, targetInfo.columnType); + LOG.info("Removing target map work " + targetMapWork.getName() + " from " + baseWork + .getName() + " as no dependency exists between the two works."); + } + } + desc.getTargetInfos().removeAll(toRemove); + if (desc.getTargetInfos().isEmpty()) { + // The DPP sink has no target, remove the subtree. + OperatorUtils.removeBranch(pruningSinkOp); } } } @@ -106,24 +120,6 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws } } - /** - * Remove a {@link SparkPartitionPruningSinkOperator} from a given {@link BaseWork}. Unlink the target {@link MapWork} - * and the given {@link SparkPartitionPruningSinkOperator}. - */ - private void removeSparkPartitionPruningSink(BaseWork sourceWork, MapWork targetMapWork, - SparkPartitionPruningSinkOperator pruningSinkOp) { - // Remove the DPP operator subtree - OperatorUtils.removeBranch(pruningSinkOp); - - // Remove all event source info from the target MapWork - String sourceWorkId = pruningSinkOp.getUniqueId(); - SparkPartitionPruningSinkDesc pruningSinkDesc = pruningSinkOp.getConf(); - targetMapWork.getEventSourceTableDescMap().get(sourceWorkId).remove(pruningSinkDesc.getTable()); - targetMapWork.getEventSourceColumnNameMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnName()); - targetMapWork.getEventSourceColumnTypeMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnType()); - targetMapWork.getEventSourcePartKeyExprMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetPartKey()); - } - /** * Recursively go through the children of the given {@link Task} and check if any child {@link SparkTask} contains * the specified {@link MapWork} object. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index a633d2b913..af8ae8af21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -35,10 +35,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -161,54 +159,6 @@ private boolean belongToSet(Set set, BaseWork work, SparkWork sparkWor return false; } - // merge the second into the first - private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first, - SparkPartitionPruningSinkDesc second, String firstId, String secondId) { - MapWork target2 = second.getTargetMapWork(); - - first.addTarget(second.getTargetColumnName(), second.getTargetColumnType(), - second.getTargetPartKey(), target2); - - // update the target map work of the second - target2.setTmpPathForPartitionPruning(first.getTmpPathOfTargetWork()); - - List partKey = target2.getEventSourcePartKeyExprMap().get(secondId); - partKey.remove(second.getTargetPartKey()); - if (partKey.isEmpty()) { - target2.getEventSourcePartKeyExprMap().remove(secondId); - } - List newPartKey = target2.getEventSourcePartKeyExprMap().computeIfAbsent( - firstId, v -> new ArrayList<>()); - newPartKey.add(second.getTargetPartKey()); - - List tableDesc = target2.getEventSourceTableDescMap().get(secondId); - tableDesc.remove(second.getTable()); - if (tableDesc.isEmpty()) { - target2.getEventSourceTableDescMap().remove(secondId); - } - List newTableDesc = target2.getEventSourceTableDescMap().computeIfAbsent( - firstId, v -> new ArrayList<>()); - newTableDesc.add(second.getTable()); - - List columnName = target2.getEventSourceColumnNameMap().get(secondId); - columnName.remove(second.getTargetColumnName()); - if (columnName.isEmpty()) { - target2.getEventSourceColumnNameMap().remove(secondId); - } - List newColumnName = target2.getEventSourceColumnNameMap().computeIfAbsent( - firstId, v -> new ArrayList<>()); - newColumnName.add(second.getTargetColumnName()); - - List columnType = target2.getEventSourceColumnTypeMap().get(secondId); - columnType.remove(second.getTargetColumnType()); - if (columnType.isEmpty()) { - target2.getEventSourceColumnTypeMap().remove(secondId); - } - List newColumnType = target2.getEventSourceColumnTypeMap().computeIfAbsent( - firstId, v -> new ArrayList<>()); - newColumnType.add(second.getTargetColumnType()); - } - private Set combineEquivalentWorks(Set> equivalentWorks, SparkWork sparkWork) { Set removedWorks = Sets.newHashSet(); for (Set workSet : equivalentWorks) { @@ -222,8 +172,7 @@ private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first, List dppList2 = workToDpps.get(next); // equivalent works must have dpp lists of same size for (int i = 0; i < dppList1.size(); i++) { - combineEquivalentDPPSinks(dppList1.get(i).getConf(), dppList2.get(i).getConf(), - dppList1.get(i).getUniqueId(), dppList2.get(i).getUniqueId()); + SparkUtilities.combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i)); } } replaceWork(next, first, sparkWork); @@ -428,7 +377,10 @@ private void removeDynamicPartitionPruningSink(List removedMapWorkList, SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); for (Operator pruneSinkOp : pruningList) { SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; - if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetMapWork().getName())) { + for (String removedName : removedMapWorkList) { + sparkPruneSinkOp.getConf().removeTarget(removedName); + } + if (sparkPruneSinkOp.getConf().getTargetInfos().isEmpty()) { LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " + "has been deleted!"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index bacc44482a..7b1fd5f206 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -478,7 +478,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeSparkProc OperatorUtils.removeBranch(partitionPruningSinkOp); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " - + (partitionPruningSinkOp.getConf()).getTableScan().getName() + + (partitionPruningSinkOp.getConf()).getTableScanNames() + ". Need to be removed together with reduce sink"); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java index d1c53cf345..cffdaf2dec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java @@ -18,10 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer.spark; -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -44,12 +42,15 @@ // the partition column we're interested in public ExprNodeDesc partKey; public MapWork work; + public transient TableScanOperator tableScan; - DPPTargetInfo(String columnName, String columnType, ExprNodeDesc partKey, MapWork work) { + DPPTargetInfo(String columnName, String columnType, ExprNodeDesc partKey, MapWork work, + TableScanOperator tableScan) { this.columnName = columnName; this.columnType = columnType; this.partKey = partKey; this.work = work; + this.tableScan = tableScan; } } @@ -57,52 +58,18 @@ private TableDesc table; - private transient TableScanOperator tableScan; - private Path path; public List getTargetInfos() { return targetInfos; } - private void assertSingleTarget() { - Preconditions.checkState(targetInfos.size() < 2, "The DPP sink has multiple targets."); - } - - public String getTargetColumnName() { - assertSingleTarget(); - return targetInfos.isEmpty() ? null : targetInfos.get(0).columnName; - } - - public String getTargetColumnType() { - assertSingleTarget(); - return targetInfos.isEmpty() ? null : targetInfos.get(0).columnType; - } - - public ExprNodeDesc getTargetPartKey() { - assertSingleTarget(); - return targetInfos.isEmpty() ? null : targetInfos.get(0).partKey; - } - - public MapWork getTargetMapWork() { - assertSingleTarget(); - return targetInfos.isEmpty() ? null : targetInfos.get(0).work; + public void addTarget(String colName, String colType, ExprNodeDesc partKey, MapWork mapWork, + TableScanOperator tableScan) { + targetInfos.add(new DPPTargetInfo(colName, colType, partKey, mapWork, tableScan)); } - public void addTarget(String colName, String colType, ExprNodeDesc partKey, MapWork mapWork) { - targetInfos.add(new DPPTargetInfo(colName, colType, partKey, mapWork)); - } - - public void setTargetMapWork(MapWork mapWork) { - Preconditions.checkState(targetInfos.size() == 1, - "The DPP sink should have exactly one target."); - targetInfos.get(0).work = mapWork; - // in order to make the col name unique, prepend the targetId - targetInfos.get(0).columnName = SparkUtilities.getWorkId(mapWork) + ":" + - targetInfos.get(0).columnName; - } - - Path getTmpPathOfTargetWork() { + public Path getTmpPathOfTargetWork() { return targetInfos.isEmpty() ? null : targetInfos.get(0).work.getTmpPathForPartitionPruning(); } @@ -120,12 +87,8 @@ public String getTargetWorks() { return Arrays.toString(targetInfos.stream().map(info -> info.work.getName()).toArray()); } - public TableScanOperator getTableScan() { - return tableScan; - } - - public void setTableScan(TableScanOperator tableScan) { - this.tableScan = tableScan; + public String getTableScanNames() { + return Arrays.toString(targetInfos.stream().map(info -> info.tableScan.getName()).toArray()); } @Explain(displayName = "Target column") @@ -156,4 +119,14 @@ public boolean isSame(OperatorDesc other) { } return false; } + + public void removeTarget(String name) { + List toRemove = new ArrayList<>(); + for (DPPTargetInfo targetInfo : targetInfos) { + if (targetInfo.work.getName().equals(name)) { + toRemove.add(targetInfo); + } + } + targetInfos.removeAll(toRemove); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index ba4bb5978a..900a80000d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; @@ -64,7 +65,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,7 +256,10 @@ public void removeUnionOperators(GenSparkProcContext context, BaseWork work) } else if (op instanceof SparkPartitionPruningSinkOperator) { SparkPartitionPruningSinkOperator oldPruningSink = (SparkPartitionPruningSinkOperator) op; SparkPartitionPruningSinkOperator newPruningSink = (SparkPartitionPruningSinkOperator) newOp; - newPruningSink.getConf().setTableScan(oldPruningSink.getConf().getTableScan()); + for (int i = 0; i < oldPruningSink.getConf().getTargetInfos().size(); i++) { + newPruningSink.getConf().getTargetInfos().get(i).tableScan = + oldPruningSink.getConf().getTargetInfos().get(i).tableScan; + } context.pruningSinkSet.add(newPruningSink); context.pruningSinkSet.remove(oldPruningSink); } @@ -456,52 +459,27 @@ public static Path createMoveTask(Task currTask, boolean public void processPartitionPruningSink(GenSparkProcContext context, SparkPartitionPruningSinkOperator pruningSink) { SparkPartitionPruningSinkDesc desc = pruningSink.getConf(); - TableScanOperator ts = desc.getTableScan(); - MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts); - - Preconditions.checkArgument( - targetWork != null, - "No targetWork found for tablescan " + ts); - - String sourceId = pruningSink.getUniqueId(); - - // set up temporary path to communicate between the small/big table - Path tmpPath = targetWork.getTmpPathForPartitionPruning(); - if (tmpPath == null) { - tmpPath = getDPPOutputPath(context.parseContext.getContext()); - targetWork.setTmpPathForPartitionPruning(tmpPath); - LOG.info("Setting tmp path between source work and target work:\n" + tmpPath); - } - - desc.setPath(new Path(tmpPath, sourceId)); - desc.setTargetMapWork(targetWork); - - // store table descriptor in map-targetWork - if (!targetWork.getEventSourceTableDescMap().containsKey(sourceId)) { - targetWork.getEventSourceTableDescMap().put(sourceId, new LinkedList()); - } - List tables = targetWork.getEventSourceTableDescMap().get(sourceId); - tables.add(pruningSink.getConf().getTable()); - - // store column name in map-targetWork - if (!targetWork.getEventSourceColumnNameMap().containsKey(sourceId)) { - targetWork.getEventSourceColumnNameMap().put(sourceId, new LinkedList()); - } - List columns = targetWork.getEventSourceColumnNameMap().get(sourceId); - columns.add(desc.getTargetColumnName()); + final Path outputBase = getDPPOutputPath(context.parseContext.getContext()); + final String sourceId = pruningSink.getUniqueId(); + desc.setPath(new Path(outputBase, sourceId)); + + for (SparkPartitionPruningSinkDesc.DPPTargetInfo targetInfo : desc.getTargetInfos()) { + TableScanOperator ts = targetInfo.tableScan; + MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts); + Preconditions.checkNotNull(targetWork, "No targetWork found for tablescan " + ts); + + // set up temporary path to communicate between the small/big table + if (targetWork.getTmpPathForPartitionPruning() == null) { + targetWork.setTmpPathForPartitionPruning(outputBase); + LOG.info("Setting tmp path between source work and target work:\n" + outputBase); + } - if (!targetWork.getEventSourceColumnTypeMap().containsKey(sourceId)) { - targetWork.getEventSourceColumnTypeMap().put(sourceId, new LinkedList()); - } - List columnTypes = targetWork.getEventSourceColumnTypeMap().get(sourceId); - columnTypes.add(desc.getTargetColumnType()); + targetInfo.work = targetWork; + targetInfo.columnName = SparkUtilities.getWorkId(targetWork) + ":" + targetInfo.columnName; - // store partition key expr in map-targetWork - if (!targetWork.getEventSourcePartKeyExprMap().containsKey(sourceId)) { - targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList()); + pruningSink.addAsSourceEvent(targetWork, targetInfo.partKey, targetInfo.columnName, + targetInfo.columnType); } - List keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId); - keys.add(desc.getTargetPartKey()); } private Path getDPPOutputPath(Context context) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 5220281ccf..bf18dbcd6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -209,7 +209,7 @@ private void removeDPPOperator(Set> component, OptimizeSparkProcCont OperatorUtils.removeBranch(toRemove); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " - + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency"); + + toRemove.getConf().getTableScanNames() + ". Needed to break cyclic dependency"); } // Tarjan's algo @@ -241,9 +241,12 @@ private void connect(Operator o, AtomicInteger index, Stack> node if (o instanceof SparkPartitionPruningSinkOperator) { children = new ArrayList<>(); children.addAll(o.getChildOperators()); - TableScanOperator ts = ((SparkPartitionPruningSinkDesc) o.getConf()).getTableScan(); - LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); - children.add(ts); + SparkPartitionPruningSinkDesc dppDesc = ((SparkPartitionPruningSinkOperator) o).getConf(); + for (SparkPartitionPruningSinkDesc.DPPTargetInfo targetInfo : dppDesc.getTargetInfos()) { + TableScanOperator ts = targetInfo.tableScan; + LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); + children.add(ts); + } } else { children = o.getChildOperators(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index 143f777032..f4a3822ea5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -21,6 +21,8 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -34,6 +36,9 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -216,4 +221,67 @@ public synchronized String getUniqueId() { public synchronized void setUniqueId(String uniqueId) { this.uniqueId = uniqueId; } + + public void addAsSourceEvent(MapWork mapWork, ExprNodeDesc partKey, String columnName, + String columnType) { + String sourceId = getUniqueId(); + SparkPartitionPruningSinkDesc conf = getConf(); + + // store table descriptor in map-targetWork + List tableDescs = mapWork.getEventSourceTableDescMap().computeIfAbsent(sourceId, + v -> new ArrayList<>()); + tableDescs.add(conf.getTable()); + + // store partition key expr in map-targetWork + List partKeys = mapWork.getEventSourcePartKeyExprMap().computeIfAbsent(sourceId, + v -> new ArrayList<>()); + partKeys.add(partKey); + + // store column name in map-targetWork + List columnNames = mapWork.getEventSourceColumnNameMap().computeIfAbsent(sourceId, + v -> new ArrayList<>()); + columnNames.add(columnName); + + List columnTypes = mapWork.getEventSourceColumnTypeMap().computeIfAbsent(sourceId, + v -> new ArrayList<>()); + columnTypes.add(columnType); + } + + public void removeFromSourceEvent(MapWork mapWork, ExprNodeDesc partKey, String columnName, + String columnType) { + String sourceId = getUniqueId(); + SparkPartitionPruningSinkDesc conf = getConf(); + + List tableDescs = mapWork.getEventSourceTableDescMap().get(sourceId); + if (tableDescs != null) { + tableDescs.remove(conf.getTable()); + if (tableDescs.isEmpty()) { + mapWork.getEventSourceTableDescMap().remove(sourceId); + } + } + + List partKeys = mapWork.getEventSourcePartKeyExprMap().get(sourceId); + if (partKeys != null) { + partKeys.remove(partKey); + if (partKeys.isEmpty()) { + mapWork.getEventSourcePartKeyExprMap().remove(sourceId); + } + } + + List columnNames = mapWork.getEventSourceColumnNameMap().get(sourceId); + if (columnNames != null) { + columnNames.remove(columnName); + if (columnNames.isEmpty()) { + mapWork.getEventSourceColumnNameMap().remove(sourceId); + } + } + + List columnTypes = mapWork.getEventSourceColumnTypeMap().get(sourceId); + if (columnTypes != null) { + columnTypes.remove(columnType); + if (columnTypes.isEmpty()) { + mapWork.getEventSourceColumnTypeMap().remove(sourceId); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java index 90c28e8986..b0d7eeca3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java @@ -25,9 +25,9 @@ import java.util.Set; import java.util.Stack; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -102,8 +102,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Operator branchingOp = pruningSinkOp.getBranchingOp(); List> savedChildOps = branchingOp.getChildOperators(); List> firstNodesOfPruningBranch = findFirstNodesOfPruningBranch(branchingOp); - branchingOp.setChildOperators(Utilities.makeList(firstNodesOfPruningBranch.toArray(new - Operator[firstNodesOfPruningBranch.size()]))); + branchingOp.setChildOperators(null); // Now clone the tree above selOp List> newRoots = SerializationUtilities.cloneOperatorTree(roots); @@ -114,11 +113,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } context.clonedPruningTableScanSet.addAll(newRoots); - //Find all pruningSinkSet in old roots - List> oldsinkList = new ArrayList<>(); - for (Operator root : roots) { - SparkUtilities.collectOp(oldsinkList, root, SparkPartitionPruningSinkOperator.class); + Operator newBranchingOp = null; + for (int i = 0; i < newRoots.size() && newBranchingOp == null; i++) { + newBranchingOp = OperatorUtils.findOperatorById(newRoots.get(i), branchingOp.getOperatorId()); } + Preconditions.checkNotNull(newBranchingOp, + "Cannot find the branching operator in cloned tree."); + newBranchingOp.setChildOperators(firstNodesOfPruningBranch); // Restore broken links between operators, and remove the branch from the original tree branchingOp.setChildOperators(savedChildOps); @@ -126,19 +127,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, branchingOp.removeChild(selOp); } - //Find all pruningSinkSet in new roots Set> sinkSet = new LinkedHashSet<>(); - for (Operator root : newRoots) { - SparkUtilities.collectOp(sinkSet, root, SparkPartitionPruningSinkOperator.class); - } - - int i = 0; - for (Operator clonedPruningSinkOp : sinkSet) { - SparkPartitionPruningSinkOperator oldsinkOp = (SparkPartitionPruningSinkOperator) oldsinkList.get(i++); - ((SparkPartitionPruningSinkOperator) clonedPruningSinkOp).getConf().setTableScan(oldsinkOp.getConf().getTableScan()); - context.pruningSinkSet.add(clonedPruningSinkOp); - + for (Operator sel : firstNodesOfPruningBranch) { + SparkUtilities.collectOp(sinkSet, sel, SparkPartitionPruningSinkOperator.class); + sel.setParentOperators(Utilities.makeList(newBranchingOp)); } + context.pruningSinkSet.addAll(sinkSet); return null; } @@ -147,9 +141,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private List> findFirstNodesOfPruningBranch(Operator branchingOp) { List> res = new ArrayList<>(); for (Operator child : branchingOp.getChildOperators()) { - List> pruningList = new ArrayList<>(); - SparkUtilities.collectOp(pruningList, child, SparkPartitionPruningSinkOperator.class); - if (pruningList.size() > 0) { + if (SparkUtilities.isDirectDPPBranch(child)) { res.add(child); } } diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_6.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_6.q new file mode 100644 index 0000000000..222d5af035 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_6.q @@ -0,0 +1,32 @@ +set hive.spark.dynamic.partition.pruning=true; + +-- SORT_QUERY_RESULTS + +-- This qfile tests whether we can combine DPP sinks within a BaseWork + +create table part_table_1 (col int) partitioned by (part_col int); +create table part_table_2 (col int) partitioned by (part_col int); +create table regular_table (col int); + +insert into table regular_table values (1); + +alter table part_table_1 add partition (part_col=1); +insert into table part_table_1 partition (part_col=1) values (1), (2), (3), (4); + +alter table part_table_1 add partition (part_col=2); +insert into table part_table_1 partition (part_col=2) values (1), (2), (3), (4); + +alter table part_table_2 add partition (part_col=1); +insert into table part_table_2 partition (part_col=1) values (1), (2), (3), (4); + +alter table part_table_2 add partition (part_col=2); +insert into table part_table_2 partition (part_col=2) values (1), (2), (3), (4); + +-- dpp sinks should be combined + +explain +select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col; + +select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col; diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out index e7a789ad65..86acc95059 100644 --- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out @@ -1424,24 +1424,10 @@ STAGE PLANS: outputColumnNames: _col0 Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE Spark Partition Pruning Sink Operator - Target column: [4:q (string), 8:q (string)] - partition key expr: [q, q] - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - target works: [Map 4, Map 8] - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - Target column: [4:p (string), 8:p (string)] - partition key expr: [p, p] + Target column: [4:q (string), 4:p (string), 8:q (string), 8:p (string)] + partition key expr: [q, p, q, p] Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - target works: [Map 4, Map 8] + target works: [Map 4, Map 4, Map 8, Map 8] Stage: Stage-1 Spark @@ -1700,24 +1686,10 @@ STAGE PLANS: outputColumnNames: _col0 Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE Spark Partition Pruning Sink Operator - Target column: [4:q (string)] - partition key expr: [q] - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - target works: [Map 4] - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - Target column: [4:p (string)] - partition key expr: [p] + Target column: [4:q (string), 4:p (string)] + partition key expr: [q, p] Statistics: Num rows: 200 Data size: 2000 Basic stats: COMPLETE Column stats: NONE - target works: [Map 4] + target works: [Map 4, Map 4] Reducer 12 Reduce Operator Tree: Select Operator diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_6.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_6.q.out new file mode 100644 index 0000000000..83bbd13b50 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_6.q.out @@ -0,0 +1,286 @@ +PREHOOK: query: create table part_table_1 (col int) partitioned by (part_col int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part_table_1 +POSTHOOK: query: create table part_table_1 (col int) partitioned by (part_col int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part_table_1 +PREHOOK: query: create table part_table_2 (col int) partitioned by (part_col int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part_table_2 +POSTHOOK: query: create table part_table_2 (col int) partitioned by (part_col int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part_table_2 +PREHOOK: query: create table regular_table (col int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@regular_table +POSTHOOK: query: create table regular_table (col int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@regular_table +PREHOOK: query: insert into table regular_table values (1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@regular_table +POSTHOOK: query: insert into table regular_table values (1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@regular_table +POSTHOOK: Lineage: regular_table.col SCRIPT [] +PREHOOK: query: alter table part_table_1 add partition (part_col=1) +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@part_table_1 +POSTHOOK: query: alter table part_table_1 add partition (part_col=1) +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@part_table_1 +POSTHOOK: Output: default@part_table_1@part_col=1 +PREHOOK: query: insert into table part_table_1 partition (part_col=1) values (1), (2), (3), (4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@part_table_1@part_col=1 +POSTHOOK: query: insert into table part_table_1 partition (part_col=1) values (1), (2), (3), (4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@part_table_1@part_col=1 +POSTHOOK: Lineage: part_table_1 PARTITION(part_col=1).col SCRIPT [] +PREHOOK: query: alter table part_table_1 add partition (part_col=2) +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@part_table_1 +POSTHOOK: query: alter table part_table_1 add partition (part_col=2) +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@part_table_1 +POSTHOOK: Output: default@part_table_1@part_col=2 +PREHOOK: query: insert into table part_table_1 partition (part_col=2) values (1), (2), (3), (4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@part_table_1@part_col=2 +POSTHOOK: query: insert into table part_table_1 partition (part_col=2) values (1), (2), (3), (4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@part_table_1@part_col=2 +POSTHOOK: Lineage: part_table_1 PARTITION(part_col=2).col SCRIPT [] +PREHOOK: query: alter table part_table_2 add partition (part_col=1) +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@part_table_2 +POSTHOOK: query: alter table part_table_2 add partition (part_col=1) +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@part_table_2 +POSTHOOK: Output: default@part_table_2@part_col=1 +PREHOOK: query: insert into table part_table_2 partition (part_col=1) values (1), (2), (3), (4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@part_table_2@part_col=1 +POSTHOOK: query: insert into table part_table_2 partition (part_col=1) values (1), (2), (3), (4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@part_table_2@part_col=1 +POSTHOOK: Lineage: part_table_2 PARTITION(part_col=1).col SCRIPT [] +PREHOOK: query: alter table part_table_2 add partition (part_col=2) +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@part_table_2 +POSTHOOK: query: alter table part_table_2 add partition (part_col=2) +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@part_table_2 +POSTHOOK: Output: default@part_table_2@part_col=2 +PREHOOK: query: insert into table part_table_2 partition (part_col=2) values (1), (2), (3), (4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@part_table_2@part_col=2 +POSTHOOK: query: insert into table part_table_2 partition (part_col=2) values (1), (2), (3), (4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@part_table_2@part_col=2 +POSTHOOK: Lineage: part_table_2 PARTITION(part_col=2).col SCRIPT [] +PREHOOK: query: explain +select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan + alias: regular_table + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: col is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: col (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [1:part_col (int), 4:part_col (int)] + partition key expr: [part_col, part_col] + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + target works: [Map 1, Map 4] + Map 6 + Map Operator Tree: + TableScan + alias: part_table_2 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: col (type: int), part_col (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [1:part_col (int)] + partition key expr: [part_col] + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + target works: [Map 1] + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: part_table_1 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: col (type: int), part_col (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: int) + sort order: + + Map-reduce partition columns: _col1 (type: int) + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Map 3 + Map Operator Tree: + TableScan + alias: regular_table + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: col is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: col (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: part_table_2 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: col (type: int), part_col (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: int) + sort order: + + Map-reduce partition columns: _col1 (type: int) + Statistics: Num rows: 8 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + Inner Join 1 to 2 + keys: + 0 _col1 (type: int) + 1 _col0 (type: int) + 2 _col1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 17 Data size: 17 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: int), _col0 (type: int), _col1 (type: int), _col3 (type: int), _col4 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 17 Data size: 17 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 17 Data size: 17 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col +PREHOOK: type: QUERY +PREHOOK: Input: default@part_table_1 +PREHOOK: Input: default@part_table_1@part_col=1 +PREHOOK: Input: default@part_table_1@part_col=2 +PREHOOK: Input: default@part_table_2 +PREHOOK: Input: default@part_table_2@part_col=1 +PREHOOK: Input: default@part_table_2@part_col=2 +PREHOOK: Input: default@regular_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from regular_table, part_table_1, part_table_2 +where regular_table.col = part_table_1.part_col and regular_table.col = part_table_2.part_col +POSTHOOK: type: QUERY +POSTHOOK: Input: default@part_table_1 +POSTHOOK: Input: default@part_table_1@part_col=1 +POSTHOOK: Input: default@part_table_1@part_col=2 +POSTHOOK: Input: default@part_table_2 +POSTHOOK: Input: default@part_table_2@part_col=1 +POSTHOOK: Input: default@part_table_2@part_col=2 +POSTHOOK: Input: default@regular_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 1 1 1 1 +1 1 1 2 1 +1 1 1 3 1 +1 1 1 4 1 +1 2 1 1 1 +1 2 1 2 1 +1 2 1 3 1 +1 2 1 4 1 +1 3 1 1 1 +1 3 1 2 1 +1 3 1 3 1 +1 3 1 4 1 +1 4 1 1 1 +1 4 1 2 1 +1 4 1 3 1 +1 4 1 4 1