diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/AbstractSparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/AbstractSparkShuffler.java new file mode 100644 index 0000000..70e4ebf --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/AbstractSparkShuffler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; + +public abstract class AbstractSparkShuffler implements SparkShuffler { + private SparkEdgeProperty edge; + + public AbstractSparkShuffler(SparkEdgeProperty edge) { + this.edge = edge; + } + + public int getPartitionNumber() { + int partitionNumber = edge.getNumPartitions() <= 0 ? 1 : edge.getNumPartitions(); + return partitionNumber; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index abd4718..7bb7bec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -1,14 +1,21 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; -public class GroupByShuffler implements SparkShuffler { +public class GroupByShuffler extends AbstractSparkShuffler { + + + public GroupByShuffler(SparkEdgeProperty edge) { + super(edge); + } @Override public JavaPairRDD> shuffle( JavaPairRDD input) { - return input.groupByKey(/* default to hash partition */); + return input.groupByKey(new HashPartitioner(this.getPartitionNumber())); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index f262065..4616107 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -20,13 +20,18 @@ import java.util.Iterator; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; -public class SortByShuffler implements SparkShuffler { +public class SortByShuffler extends AbstractSparkShuffler { + + public SortByShuffler(SparkEdgeProperty edge) { + super(edge); + } @Override public JavaPairRDD> shuffle( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 73553ee..f7eeead 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -111,7 +111,7 @@ private MapTran generate(MapWork mw) throws IOException { private SparkShuffler generate(SparkEdgeProperty edge) { // TODO: create different shuffler based on edge prop. - return new GroupByShuffler(); + return new GroupByShuffler(edge); } private ReduceTran generate(ReduceWork rw) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index fb25596..4533d11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -18,28 +18,49 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; public class SparkTask extends Task { private static final long serialVersionUID = 1L; + private transient JobConf job; + private transient ContentSummary inputSummary = null; + + @Override + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { + super.initialize(conf, queryPlan, driverContext); + this.job = new JobConf(conf, SparkTask.class); + } @Override public int execute(DriverContext driverContext) { + int rc = 1; SparkClient client = null; try { + setNumberOfReducers(); client = SparkClient.getInstance(driverContext.getCtx().getConf()); rc = client.execute(driverContext, getWork()); - } finally { + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Failed to execute spark task.", e); + return 1; + } + finally { if (client != null) { rc = close(rc); } @@ -86,4 +107,56 @@ public String getName() { return "SPARK"; } + /** + * Set the number of reducers for the spark work. + */ + private void setNumberOfReducers() throws IOException { + for (BaseWork baseWork : work.getAllWork()) { + if (baseWork instanceof ReduceWork) { + setNumberOfReducers((ReduceWork) baseWork); + } + } + + console + .printInfo("In order to change the average load for a reducer (in bytes):"); + console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + + "="); + console.printInfo("In order to limit the maximum number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + + "="); + console.printInfo("In order to set a constant number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + + "="); + } + + private void setNumberOfReducers(ReduceWork rWork) throws IOException { + // this is a temporary hack to fix things that are not fixed in the compiler + Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks(); + + if (rWork == null) { + console + .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator"); + } else { + if (numReducersFromWork >= 0) { + console.printInfo("Number of reduce tasks determined at compile time: " + + rWork.getNumReduceTasks()); + } else if (job.getNumReduceTasks() > 0) { + int reducers = job.getNumReduceTasks(); + rWork.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + + reducers); + } else { + if (inputSummary == null) { + inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); + } + int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), + false); + rWork.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Estimated from input data size: " + + reducers); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index d7e1fbf..9fd5f07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -126,7 +126,8 @@ public void initialize(HiveConf hiveConf) { transformations.add(new StatsOptimizer()); } if (pctx.getContext().getExplain() || - HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") || + HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java new file mode 100644 index 0000000..419cb75 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +import java.util.Stack; + +/** + * SetSparkReducerParallelism determines how many reducers should + * be run for a given reduce sink, clone from SetReducerParallelism. + */ +public class SetSparkReducerParallelism implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext; + + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + + long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + int constantReducers = context.conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + + if (context.visitedReduceSinks.contains(sink)) { + // skip walking the children + LOG.debug("Already processed reduce sink: " + sink.getName()); + return true; + } + + context.visitedReduceSinks.add(sink); + + if (desc.getNumReducers() <= 0) { + if (constantReducers > 0) { + LOG.info("Parallelism for reduce sink "+sink+" set by user to "+constantReducers); + desc.setNumReducers(constantReducers); + } else { + long numberOfBytes = 0; + + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling: + sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + } else { + LOG.warn("No stats available from: "+sibling); + } + } + + int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, + maxReducers, false); + LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); + desc.setNumReducers(numReducers); + desc.setAutoParallel(true); + } + } else { + LOG.info("Number of reducers determined to be: "+desc.getNumReducers()); + } + + return false; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 25eea14..c306fa2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -92,14 +92,6 @@ public UnionWork createUnionWork(GenSparkProcContext context, Operator operat public ReduceWork createReduceWork(GenSparkProcContext context, Operator root, SparkWork sparkWork) { assert !root.getParentOperators().isEmpty(); - boolean isAutoReduceParallelism = - context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM); - - float maxPartitionFactor = - context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); - float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); - long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); - ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); @@ -114,36 +106,11 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { - reduceWork.setAutoReduceParallelism(true); - - // configured limit for reducers - int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - - // min we allow spark to pick - int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() - * minPartitionFactor)); - minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; - - // max we allow spark to pick - int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); - maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; - - reduceWork.setMinReduceTasks(minPartition); - reduceWork.setMaxReduceTasks(maxPartition); - } - setupReduceSink(context, reduceWork, reduceSink); sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp; - if (reduceWork.isAutoReduceParallelism()) { - edgeProp = - new SparkEdgeProperty(0); - } else { - edgeProp = new SparkEdgeProperty(0); - } + SparkEdgeProperty edgeProp = new SparkEdgeProperty(0, reduceWork.getNumReduceTasks()); sparkWork.connect( context.preceedingWork, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java new file mode 100644 index 0000000..712a108 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.hive.ql.parse.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.Deque; +import java.util.HashSet; +import java.util.Set; + +/** + * OptimizeSparkProcContext. OptimizeSparkProcContext maintains information + * about the current operator plan as we walk the operator tree + * to do some additional optimizations on it. + * + */ +public class OptimizeSparkProcContext implements NodeProcessorCtx { + + public final ParseContext parseContext; + public final HiveConf conf; + + public final Set inputs; + public final Set outputs; + + public final Set visitedReduceSinks + = new HashSet(); + + // rootOperators are all the table scan operators in sequence + // of traversal + public final Deque> rootOperators; + + @SuppressWarnings("unchecked") + public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext, + Set inputs, Set outputs, + Deque> rootOperators) { + + this.conf = conf; + this.parseContext = parseContext; + this.inputs = inputs; + this.outputs = outputs; + this.rootOperators = rootOperators; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 3840318..b80c349 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -19,8 +19,10 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -48,18 +51,19 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism; +import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TaskCompiler; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; -import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.TaskCompiler; /** * SparkCompiler translates the operator plan into SparkTasks. @@ -87,7 +91,6 @@ public void init(HiveConf conf, LogHelper console, Hive db) { protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { // TODO: need to add spark specific optimization. -/* // Sequence of TableScan operators to be walked Deque> deque = new LinkedList>(); deque.addAll(pCtx.getTopOps().values()); @@ -101,10 +104,10 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"), ReduceSinkOperator.getOperatorName() + "%"), - new SetReducerParallelism()); + new SetSparkReducerParallelism()); - opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), - JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); +// opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), +// JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -113,7 +116,6 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new ForwardWalker(disp); ogw.startWalking(topNodes, null); -*/ } /**