commit 8829910bf897a4fbce1877d2d260e42ffb367e95 Author: kellyzly Date: Mon Dec 4 17:59:00 2017 +0800 HIVE-17486.1.patch diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3be5a8d..d886f34 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1688,6 +1688,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_SPARK_SHARED_WORK_OPTIMIZATION("hive.spark.optimize.shared.work", false, + "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + + "and follow-up operators in the query plan and merges them if they meet some preconditions."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java new file mode 100644 index 0000000..8a90256 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedResult.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.Operator; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class SharedResult { + public final List> retainableOps; + public final List> discardableOps; + public final Set> discardableInputOps; + public final long dataSize; + public final long maxDataSize; + + public SharedResult(Collection> retainableOps, Collection> discardableOps, + Set> discardableInputOps, long dataSize, long maxDataSize) { + this.retainableOps = ImmutableList.copyOf(retainableOps); + this.discardableOps = ImmutableList.copyOf(discardableOps); + this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); + this.dataSize = dataSize; + this.maxDataSize = maxDataSize; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index d4ddb75..7ebd07f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -36,6 +36,7 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -263,13 +265,26 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { GenTezUtils.removeSemiJoinOperator( pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); } - } else if (op instanceof AppMasterEventOperator) { - DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); - if (!sr.discardableOps.contains(dped.getTableScan()) && - !sr.discardableInputOps.contains(dped.getTableScan())) { - GenTezUtils.removeSemiJoinOperator( - pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } else if (op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + DynamicPruningEventDesc dped =(DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + }else if(HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")){ + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc)op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + // current in Hive on Spark,there is no similar feature like "hive.tez.dynamic.semijoin.reduction(HIVE-16862) + /*GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan());*/ + } } + } LOG.debug("Input operator removed: {}", op); } @@ -361,7 +376,7 @@ private static void gatherDPPTableScanOps( LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap splitTableScanOpsByTable( + public static Multimap splitTableScanOpsByTable( ParseContext pctx) { Multimap tableNameToOps = ArrayListMultimap.create(); // Sort by operator ID so we get deterministic results @@ -375,7 +390,7 @@ private static void gatherDPPTableScanOps( return tableNameToOps; } - private static List> rankTablesByAccumulatedSize(ParseContext pctx) { + public static List> rankTablesByAccumulatedSize(ParseContext pctx) { Map tableToTotalSize = new HashMap<>(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); @@ -402,7 +417,7 @@ public int compare(Map.Entry o1, Map.Entry o2) { return sortedTables; } - private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged // If schemas do not match, we currently do not merge @@ -486,10 +501,11 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache return true; } - private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + public static SharedResult extractSharedOptimizationInfo(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, TableScanOperator discardableTsOp) throws SemanticException { + long noconditionalTaskSize = pctx.getConf().getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); Set> retainableOps = new LinkedHashSet<>(); Set> discardableOps = new LinkedHashSet<>(); Set> discardableInputOps = new HashSet<>(); @@ -594,7 +610,14 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (equalOp1 instanceof MapJoinOperator) { MapJoinOperator mop = (MapJoinOperator) equalOp1; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } if (currentOp1.getChildOperators().size() > 1 || currentOp2.getChildOperators().size() > 1) { @@ -612,7 +635,14 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (op instanceof MapJoinOperator && !retainableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } Set> opsWork2 = findWorkOperators(optimizerCache, currentOp2); @@ -620,7 +650,13 @@ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, if (op instanceof MapJoinOperator && !discardableOps.contains(op)) { MapJoinOperator mop = (MapJoinOperator) op; dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + //for tez + if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } else if (HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + // for spark + maxDataSize = noconditionalTaskSize; + } } } @@ -797,7 +833,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera return op1.logicalEquals(op2); } - private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + public static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, SharedResult sr) { // We check whether merging the works would cause the size of @@ -1012,11 +1048,20 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer if (sjbi != null) { set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp())); } - } else if(op.getConf() instanceof DynamicPruningEventDesc) { + } else if(op.getConf() instanceof DynamicPruningEventDesc || op.getConf() instanceof SparkPartitionPruningSinkDesc) { // DPP work is considered a descendant because work needs // to finish for it to execute - set.addAll(findWorkOperators( - optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + //for tez + set.addAll(findWorkOperators( + optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan())); + }else if (HiveConf.getVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + //for spark + set.addAll(findWorkOperators( + optimizerCache, ((SparkPartitionPruningSinkDesc) op.getConf()).getTableScan())); + } } } workOps = set; @@ -1064,7 +1109,7 @@ private static boolean validPreConditions(ParseContext pctx, SharedWorkOptimizer return found; } - private static void pushFilterToTopOfTableScan( + public static void pushFilterToTopOfTableScan( SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp) throws UDFArgumentException { ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr(); @@ -1104,75 +1149,4 @@ private static void pushFilterToTopOfTableScan( } } } - - private static class SharedResult { - final List> retainableOps; - final List> discardableOps; - final Set> discardableInputOps; - final long dataSize; - final long maxDataSize; - - private SharedResult(Collection> retainableOps, Collection> discardableOps, - Set> discardableInputOps, long dataSize, long maxDataSize) { - this.retainableOps = ImmutableList.copyOf(retainableOps); - this.discardableOps = ImmutableList.copyOf(discardableOps); - this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); - this.dataSize = dataSize; - this.maxDataSize = maxDataSize; - } - } - - /** Cache to accelerate optimization */ - private static class SharedWorkOptimizerCache { - // Operators that belong to each work - final HashMultimap, Operator> operatorToWorkOperators = - HashMultimap., Operator>create(); - // Table scan operators to DPP sources - final Multimap> tableScanToDPPSource = - HashMultimap.>create(); - - // Add new operator to cache work group of existing operator (if group exists) - void putIfWorkExists(Operator opToAdd, Operator existingOp) { - List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); - if (!c.isEmpty()) { - for (Operator op : c) { - operatorToWorkOperators.get(op).add(opToAdd); - } - operatorToWorkOperators.putAll(opToAdd, c); - operatorToWorkOperators.put(opToAdd, opToAdd); - } - } - - // Remove operator - void removeOp(Operator opToRemove) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - if (!c1.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - } - } - - // Remove operator and combine - void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { - Set> s = operatorToWorkOperators.get(opToRemove); - s.remove(opToRemove); - List> c1 = ImmutableList.copyOf(s); - List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); - if (!c1.isEmpty() && !c2.isEmpty()) { - for (Operator op1 : c1) { - operatorToWorkOperators.remove(op1, opToRemove); // Remove operator - operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection - } - operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator - for (Operator op2 : c2) { - operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection - } - } - } - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java new file mode 100644 index 0000000..d50c456 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizerCache.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.parse.GenTezUtils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; +import com.google.common.collect.TreeMultiset; + +/** + * Cache to accelerate optimization + */ +public class SharedWorkOptimizerCache { + // Operators that belong to each work + final HashMultimap, Operator> operatorToWorkOperators = + HashMultimap., Operator>create(); + // Table scan operators to DPP sources + public final Multimap> tableScanToDPPSource = + HashMultimap.>create(); + + // Add new operator to cache work group of existing operator (if group exists) + void putIfWorkExists(Operator opToAdd, Operator existingOp) { + List> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp)); + if (!c.isEmpty()) { + for (Operator op : c) { + operatorToWorkOperators.get(op).add(opToAdd); + } + operatorToWorkOperators.putAll(opToAdd, c); + operatorToWorkOperators.put(opToAdd, opToAdd); + } + } + + // Remove operator + public void removeOp(Operator opToRemove) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + if (!c1.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + } + } + + // Remove operator and combine + public void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + List> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp)); + if (!c1.isEmpty() && !c2.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + for (Operator op2 : c2) { + operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 8cedbe5..1931164 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -89,7 +89,6 @@ LOG.info("Check if operator " + joinOp + " can be converted to map join"); long[] mapJoinInfo = getMapJoinConversionInfo(joinOp, context); int mapJoinConversionPos = (int) mapJoinInfo[0]; - if (mapJoinConversionPos < 0) { return null; } @@ -99,6 +98,9 @@ LOG.info("Convert to non-bucketed map join"); MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + //Refer tez code ConvertJoinMapJoin.estimateNumBuckets, if numBuckets <0, then defaultNumBucket is 1 + int defaultNumBucket = 1; + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2]/defaultNumBucket); // For native vectorized map join, we require the key SerDe to be BinarySortableSerDe // Note: the MJ may not really get natively-vectorized later, // but changing SerDe won't hurt correctness @@ -111,10 +113,14 @@ LOG.info("Check if it can be converted to bucketed map join"); numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp, context, mapJoinConversionPos); + if (numBuckets > 1) { LOG.info("Converted to map join with " + numBuckets + " buckets"); bucketColNames = joinOp.getOpTraits().getBucketColNames(); mapJoinInfo[2] /= numBuckets; + //Refer tez code in ConvertJoinMapJoin.getMapJoinConversionPos() + mapJoinOp.getConf().setInMemoryDataSize(mapJoinInfo[2]); + } else { LOG.info("Can not convert to bucketed map join"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java new file mode 100644 index 0000000..8433679 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSharedWorkOptimizer.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.SharedResult; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; +import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizerCache; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +/** + * Shared computation optimizer. + * + *

Originally, this rule would find scan operators over the same table + * in the query plan and merge them if they met some preconditions. + * + * TS TS TS + * | | -> / \ + * Op Op Op Op + * + *

Now the rule has been extended to find opportunities to other operators + * downstream, not only a single table scan. + * + * TS1 TS2 TS1 TS2 TS1 TS2 + * | | | | | | + * | RS | RS | RS + * \ / \ / -> \ / + * MapJoin MapJoin MapJoin + * | | / \ + * Op Op Op Op + * + *

A limitation in the current implementation is that the optimizer does not + * go beyond a work boundary. + * + *

The optimization only works with the Spark execution engine. + **/ + +public class SparkSharedWorkOptimizer extends Transform { + + private final static Logger LOG = LoggerFactory.getLogger(SparkSharedWorkOptimizer.class); + + private HashMap replaceFilterTSMap = new HashMap(); + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + final Map topOps = pctx.getTopOps(); + if (topOps.size() < 2) { + // Nothing to do, bail out + return pctx; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Before SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + // Cache to use during optimization + SharedWorkOptimizerCache optimizerCache = new SharedWorkOptimizerCache(); + + // Gather information about the DPP table scans and store it in the cache + gatherDPPTableScanOps(pctx, optimizerCache); + + // Map of dbName.TblName -> TSOperator + Multimap tableNameToOps = SharedWorkOptimizer.splitTableScanOpsByTable(pctx); + + // We enforce a certain order when we do the reutilization. + // In particular, we use size of table x number of reads to + // rank the tables. + List> sortedTables = SharedWorkOptimizer.rankTablesByAccumulatedSize(pctx); + LOG.debug("Sorted tables by size: {}", sortedTables); + + // Execute optimization + Multimap existingOps = ArrayListMultimap.create(); + Set> removedOps = new HashSet<>(); + for (Map.Entry tablePair : sortedTables) { + String tableName = tablePair.getKey(); + for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { + if (removedOps.contains(discardableTsOp)) { + LOG.debug("Skip {} as it has been already removed", discardableTsOp); + continue; + } + Collection prevTsOps = existingOps.get(tableName); + for (TableScanOperator retainableTsOp : prevTsOps) { + if (removedOps.contains(retainableTsOp)) { + LOG.debug("Skip {} as it has been already removed", retainableTsOp); + continue; + } + + // First we quickly check if the two table scan operators can actually be merged + boolean mergeable = SharedWorkOptimizer.areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + SharedResult sr = SharedWorkOptimizer.extractSharedOptimizationInfo( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (!SharedWorkOptimizer.validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } + + // We can merge + if (sr.retainableOps.size() > 1) { + //TODO find the suitable case for this branch + // More than TS operator + Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); + } + } + + LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + } else { + // Only TS operator + ExprNodeGenericFuncDesc exprNode = null; + if (retainableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top of children + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); + // Clone to push to table scan + exprNode = (ExprNodeGenericFuncDesc) retainableTsOp.getConf().getFilterExpr(); + } + if (discardableTsOp.getConf().getFilterExpr() != null) { + // Push filter on top + SharedWorkOptimizer.pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + ExprNodeGenericFuncDesc tsExprNode = discardableTsOp.getConf().getFilterExpr(); + if (exprNode != null && !exprNode.isSame(tsExprNode)) { + // We merge filters from previous scan by ORing with filters from current scan + if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { + List newChildren = new ArrayList<>(exprNode.getChildren().size() + 1); + for (ExprNodeDesc childExprNode : exprNode.getChildren()) { + if (childExprNode.isSame(tsExprNode)) { + // We do not need to do anything, it is in the OR expression + break; + } + newChildren.add(childExprNode); + } + if (exprNode.getChildren().size() == newChildren.size()) { + newChildren.add(tsExprNode); + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + newChildren); + } + } else { + exprNode = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFOPOr(), + Arrays.asList(exprNode, tsExprNode)); + } + } + } + // Replace filter + retainableTsOp.getConf().setFilterExpr(exprNode); + replaceFilterTSMap.put(discardableTsOp, retainableTsOp); + LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); + } + + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator( +// pctx, (AppMasterEventOperator) op, dped.getTableScan()); + } + } + LOG.debug("Input operator removed: {}", op); + } + + removedOps.add(discardableTsOp); + // Finally we remove the expression from the tree + for (Operator op : sr.discardableOps) { + //TODO Verify we need optimizerCache.removeOp(op) + optimizerCache.removeOp(op); + removedOps.add(op); + if (sr.discardableOps.size() == 1) { + // If there is a single discardable operator, it is a TableScanOperator + // and it means that we have merged filter expressions for it. Thus, we + // might need to remove DPP predicates from the retainable TableScanOperator + Collection> c = + optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + if (dppSource instanceof ReduceSinkOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (ReduceSinkOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } else if (dppSource instanceof AppMasterEventOperator) { + //TODO To find similar code in Spark +// GenTezUtils.removeSemiJoinOperator(pctx, +// (AppMasterEventOperator) dppSource, +// (TableScanOperator) sr.retainableOps.get(0)); + } + } + } + LOG.debug("Operator removed: {}", op); + } + + break; + } + + if (removedOps.contains(discardableTsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingOps.remove(tableName, discardableTsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingOps.put(tableName, discardableTsOp); + } + } + } + + // Remove unused table scan operators + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + for(TableScanOperator discardableTsOp:replaceFilterTSMap.keySet()){ + TableScanOperator retainedTsOp = replaceFilterTSMap.get(discardableTsOp); + if( retainedTsOp.getConf().getFilterExpr()!= null) { + discardableTsOp.getConf().setFilterExpr( + retainedTsOp.getConf().getFilterExpr()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + + return pctx; + } + + private static void gatherDPPTableScanOps( + ParseContext pctx, SharedWorkOptimizerCache optimizerCache) throws SemanticException { + // Find TS operators with partition pruning enabled in plan + // because these TS may potentially read different data for + // different pipeline. + // These can be: + // 1) TS with DPP. + // 2) TS with semijoin DPP. + Map topOps = pctx.getTopOps(); + Collection> tableScanOps = + Lists.>newArrayList(topOps.values()); + Set s = + OperatorUtils.findOperators(tableScanOps, SparkPartitionPruningSinkOperator.class); + for (SparkPartitionPruningSinkOperator a : s) { + if (a.getConf() instanceof SparkPartitionPruningSinkDesc) { + SparkPartitionPruningSinkDesc dped = (SparkPartitionPruningSinkDesc) a.getConf(); + optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); + } + } + for (Map.Entry e + : pctx.getRsToSemiJoinBranchInfo().entrySet()) { + optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey()); + } + LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8ae..26cb47a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -155,15 +155,19 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, public MapWork createMapWork(GenSparkProcContext context, Operator root, SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException { - Preconditions.checkArgument(root.getParentOperators().isEmpty(), - "AssertionError: expected root.getParentOperators() to be empty"); + if( !context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION) ) { + Preconditions.checkArgument(root.getParentOperators().isEmpty(), + "AssertionError: expected root.getParentOperators() to be empty"); + } MapWork mapWork = new MapWork("Map " + (++sequenceNumber)); LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); // map work starts with table scan operators - Preconditions.checkArgument(root instanceof TableScanOperator, - "AssertionError: expected root to be an instance of TableScanOperator, but was " - + root.getClass().getName()); + if( !context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION) ) { + Preconditions.checkArgument(root instanceof TableScanOperator, + "AssertionError: expected root to be an instance of TableScanOperator, but was " + + root.getClass().getName()); + } String alias_id = null; if (context.parseContext != null && context.parseContext.getTopOps() != null) { for (String currentAliasID : context.parseContext.getTopOps().keySet()) { @@ -175,9 +179,17 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, } } if (alias_id == null) - alias_id = ((TableScanOperator) root).getConf().getAlias(); + if( root instanceof TableScanOperator ) { + alias_id = ((TableScanOperator) root).getConf().getAlias(); + }else { + //If context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)==true, + //the operator tree which starts from the children of TS maybe a Map, HIVE-17486 + alias_id = root.getOperatorId(); + } if (!deferSetup) { - setupMapWork(mapWork, context, partitions,(TableScanOperator) root, alias_id); + if( root instanceof TableScanOperator) { + setupMapWork(mapWork, context, partitions, (TableScanOperator) root, alias_id); + } } // add new item to the Spark work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 5385d5e..913e14b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -60,9 +61,12 @@ */ public class GenSparkWork implements NodeProcessor { static final private Logger LOG = LoggerFactory.getLogger(GenSparkWork.class.getName()); - + //TODO we need define the num partition number for the sparkEdgeProperty between Map(TS) and Map( the child of TS-RS) + //HIVE-17486 + static final int MAP_PARTITIONS = 1000; // instance of shared utils private GenSparkUtils utils = null; + private boolean hasSeperateTS = false; /** * Constructor takes utils as parameter to facilitate testing @@ -94,6 +98,11 @@ public Object process(Node nd, Stack stack, LOG.debug("Root operator: " + root); LOG.debug("Leaf operator: " + operator); + //There is only TS operator in the MapWork, the MapWork following this Map has not been deal with + //reset hasSeperateTS as false + if (root == operator) { + hasSeperateTS = false; + } SparkWork sparkWork = context.currentTask.getWork(); SMBMapJoinOperator smbOp = GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class); @@ -122,7 +131,21 @@ public Object process(Node nd, Stack stack, context.smbMapJoinCtxMap.get(smbOp).mapWork = (MapWork) work; } } else { - work = utils.createReduceWork(context, root, sparkWork); + //put the operators from the child of TS to reduceSink to another Map + if(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION) && !hasSeperateTS ){ + work = utils.createMapWork(context,root, sparkWork, null); + MapWork mapWork = (MapWork)work; + //If context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)==true, + //use root(the child of TS)'s operatorId as the alias + mapWork.getAliasToWork().put(root.getOperatorId(), root); + SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); + edgeProperty.setNumPartitions(MAP_PARTITIONS); + sparkWork.connect(context.preceedingWork, work, edgeProperty); + removeParent(root); + hasSeperateTS=true; + }else { + work = utils.createReduceWork(context, root, sparkWork); + } } context.rootToWorkMap.put(root, work); } @@ -202,7 +225,7 @@ public Object process(Node nd, Stack stack, // a few information, since in future we may reach the parent operators via a // different path, and we may need to connect parent works with the work associated // with this root operator. - if (root.getNumParent() > 0) { + if (root.getNumParent() > 0 && !(root.getParentOperators().get(0) instanceof TableScanOperator)) { Preconditions.checkArgument(work instanceof ReduceWork, "AssertionError: expected work to be a ReduceWork, but was " + work.getClass().getName()); ReduceWork reduceWork = (ReduceWork) work; @@ -277,9 +300,11 @@ public Object process(Node nd, Stack stack, // No children means we're at the bottom. If there are more operators to scan // the next item will be a new root. if (!operator.getChildOperators().isEmpty()) { - Preconditions.checkArgument(operator.getChildOperators().size() == 1, - "AssertionError: expected operator.getChildOperators().size() to be 1, but was " - + operator.getChildOperators().size()); + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Preconditions.checkArgument(operator.getChildOperators().size() == 1, + "AssertionError: expected operator.getChildOperators().size() to be 1, but was " + + operator.getChildOperators().size()); + } context.parentOfRoot = operator; context.currentRootOperator = operator.getChildOperators().get(0); context.preceedingWork = work; @@ -287,4 +312,15 @@ public Object process(Node nd, Stack stack, return null; } + + //remove the parent relation for operator + private void removeParent(Operator operator) { + if (operator.getNumParent() > 0) { + Preconditions.checkArgument(operator.getParentOperators().size() == 1, + "AssertionError: expected operator.getParentOperators().size() to be 1, but was " + + operator.getParentOperators().size()); + Operator parent = operator.getParentOperators().get(0); + operator.removeParent(parent); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d..2d0763a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -28,6 +28,9 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -81,6 +84,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver; import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; @@ -138,6 +142,10 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, new ConstantPropagate(ConstantPropagateProcCtx.ConstantPropagateOption.SHORTCUT).transform(pCtx); } + if (procCtx.getParseContext().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + new SparkSharedWorkOptimizer().transform(procCtx.getParseContext()); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } @@ -414,9 +422,13 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN throws SemanticException { // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); + Multimap opRules = LinkedListMultimap.create(); GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + opRules.put(new RuleRegExp("Split work -TableScan",TableScanOperator.getOperatorName()+"%"),genSparkWork); + } + opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); @@ -491,7 +503,7 @@ public Object process(Node currNode, Stack stack, // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + Dispatcher disp = new SparkRuleDispatcher(null, opRules, procCtx); GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java new file mode 100644 index 0000000..e30a729 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuleDispatcher.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.spark; + +import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.Stack; + +public class SparkRuleDispatcher implements Dispatcher { + private final Multimap procRules; + private final NodeProcessorCtx procCtx; + private final Set defaultProcSet; + + /** + * Constructor. + * + * @param defaultProc + * default processor to be fired if no rule matches + * @param rules + * operator processor that handles actual processing of the node + * @param procCtx + * operator processor context, which is opaque to the dispatcher + */ + public SparkRuleDispatcher(NodeProcessor defaultProc, + Multimap rules, NodeProcessorCtx procCtx) { + this.defaultProcSet = new HashSet(); + this.defaultProcSet.add(defaultProc); + procRules = rules; + this.procCtx = procCtx; + } + /** + * Dispatcher function. + * + * @param nd + * operator to process + * @param ndStack + * the operators encountered so far + * @throws SemanticException + */ + @Override + public Object dispatch(Node nd, Stack ndStack, Object... nodeOutputs) + throws SemanticException { + + // find the firing rule + // find the rule from the stack specified + Rule rule = null; + int minCost = Integer.MAX_VALUE; + for (Rule r : procRules.keySet()) { + int cost = r.cost(ndStack); + if ((cost >= 0) && (cost < minCost)) { + minCost = cost; + rule = r; + } + } + + Collection procSet; + + if (rule == null) { + procSet = defaultProcSet; + } else { + procSet = procRules.get(rule); + } + + // Do nothing in case proc is null + Object ret = null; + for (NodeProcessor proc : procSet) { + if (proc != null) { + // Call the process function + ret = proc.process(nd, ndStack, procCtx, nodeOutputs); + } + } + return ret; + + } +}