diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index abd4718..f207fd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -1,14 +1,22 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; public class GroupByShuffler implements SparkShuffler { + private final Partitioner partitioner; + + public GroupByShuffler(int partitionNumber) { + partitioner = new HashPartitioner(partitionNumber); + } + @Override public JavaPairRDD> shuffle( JavaPairRDD input) { - return input.groupByKey(/* default to hash partition */); + return input.groupByKey(partitioner); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index f262065..66667d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -28,6 +28,10 @@ public class SortByShuffler implements SparkShuffler { + public SortByShuffler(int partitionNumber) { + // TODO init ShuffleFunction with partitionNumber while ShuffleFunction is ready. + } + @Override public JavaPairRDD> shuffle( JavaPairRDD input) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 73553ee..0a3751c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -111,7 +111,8 @@ private MapTran generate(MapWork mw) throws IOException { private SparkShuffler generate(SparkEdgeProperty edge) { // TODO: create different shuffler based on edge prop. - return new GroupByShuffler(); + int partitionNumber = edge.getNumPartitions() <= 0 ? 1 : edge.getNumPartitions(); + return new GroupByShuffler(partitionNumber); } private ReduceTran generate(ReduceWork rw) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index fb25596..830732a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -18,28 +18,48 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; public class SparkTask extends Task { private static final long serialVersionUID = 1L; + private transient JobConf job; + private transient ContentSummary inputSummary; + + @Override + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { + super.initialize(conf, queryPlan, driverContext); + job = new JobConf(conf, SparkTask.class); + } @Override public int execute(DriverContext driverContext) { + int rc = 1; SparkClient client = null; try { + configureNumberOfReducers(); client = SparkClient.getInstance(driverContext.getCtx().getConf()); rc = client.execute(driverContext, getWork()); - } finally { + } catch (Exception e) { + LOG.error("Failed to execute spark task.", e); + return 1; + } + finally { if (client != null) { rc = close(rc); } @@ -86,4 +106,52 @@ public String getName() { return "SPARK"; } + /** + * Set the number of reducers for the spark work. + */ + private void configureNumberOfReducers() throws IOException { + for (BaseWork baseWork : work.getAllWork()) { + if (baseWork instanceof ReduceWork) { + configureNumberOfReducers((ReduceWork) baseWork); + } + } + + console.printInfo("In order to change the average load for a reducer (in bytes):"); + console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "="); + console.printInfo("In order to limit the maximum number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + "="); + console.printInfo("In order to set a constant number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); + } + + private void configureNumberOfReducers(ReduceWork rWork) throws IOException { + // this is a temporary hack to fix things that are not fixed in the compiler + Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks(); + + if (rWork == null) { + console + .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator"); + } else { + if (numReducersFromWork >= 0) { + console.printInfo("Number of reduce tasks determined at compile time: " + + rWork.getNumReduceTasks()); + } else if (job.getNumReduceTasks() > 0) { + int reducers = job.getNumReduceTasks(); + rWork.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + + reducers); + } else { + if (inputSummary == null) { + inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); + } + int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), + false); + rWork.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Estimated from input data size: " + + reducers); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index d7e1fbf..9fd5f07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -126,7 +126,8 @@ public void initialize(HiveConf hiveConf) { transformations.add(new StatsOptimizer()); } if (pctx.getContext().getExplain() || - HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") || + HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java new file mode 100644 index 0000000..31434be --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +import java.util.Stack; + +/** + * SetSparkReducerParallelism determines how many reducers should + * be run for a given reduce sink, clone from SetReducerParallelism. + */ +public class SetSparkReducerParallelism implements NodeProcessor { + + private static final Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName()); + + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext; + + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + + long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS); + int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + + if (context.getVisitedReduceSinks().contains(sink)) { + // skip walking the children + LOG.debug("Already processed reduce sink: " + sink.getName()); + return true; + } + + context.getVisitedReduceSinks().add(sink); + + if (desc.getNumReducers() <= 0) { + if (constantReducers > 0) { + LOG.info("Parallelism for reduce sink " + sink + " set by user to "+constantReducers); + desc.setNumReducers(constantReducers); + } else { + long numberOfBytes = 0; + + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling: + sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + } else { + LOG.warn("No stats available from: " + sibling); + } + } + + int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, + maxReducers, false); + LOG.info("Set parallelism for reduce sink " + sink + " to: "+numReducers); + desc.setNumReducers(numReducers); + desc.setAutoParallel(true); + } + } else { + LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); + } + + return false; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 75a1033..e592e1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -95,14 +95,6 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root Preconditions.checkArgument(!root.getParentOperators().isEmpty(), "AssertionError: expected root.getParentOperators() to be non-empty"); - boolean isAutoReduceParallelism = - context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM); - - float maxPartitionFactor = - context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); - float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); - long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); - ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); @@ -119,36 +111,11 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { - reduceWork.setAutoReduceParallelism(true); - - // configured limit for reducers - int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - - // min we allow spark to pick - int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() - * minPartitionFactor)); - minPartition = (minPartition > maxReducers) ? maxReducers : minPartition; - - // max we allow spark to pick - int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); - maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition; - - reduceWork.setMinReduceTasks(minPartition); - reduceWork.setMaxReduceTasks(maxPartition); - } - setupReduceSink(context, reduceWork, reduceSink); sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp; - if (reduceWork.isAutoReduceParallelism()) { - edgeProp = - new SparkEdgeProperty(0); - } else { - edgeProp = new SparkEdgeProperty(0); - } + SparkEdgeProperty edgeProp = new SparkEdgeProperty(0, reduceWork.getNumReduceTasks()); sparkWork.connect( context.preceedingWork, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java new file mode 100644 index 0000000..0c339a5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.Deque; +import java.util.HashSet; +import java.util.Set; + +/** + * OptimizeSparkProcContext. OptimizeSparkProcContext maintains information + * about the current operator plan as we walk the operator tree + * to do some additional optimizations on it. clone from OptimizeTezProcContext. + * + */ +public class OptimizeSparkProcContext implements NodeProcessorCtx { + + private final ParseContext parseContext; + private final HiveConf conf; + private final Set inputs; + private final Set outputs; + private final Set visitedReduceSinks = new HashSet(); + + // rootOperators are all the table scan operators in sequence + // of traversal + private final Deque> rootOperators; + + public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext, + Set inputs, Set outputs, + Deque> rootOperators) { + + this.conf = conf; + this.parseContext = parseContext; + this.inputs = inputs; + this.outputs = outputs; + this.rootOperators = rootOperators; + } + + public ParseContext getParseContext() { + return parseContext; + } + + public HiveConf getConf() { + return conf; + } + + public Set getInputs() { + return inputs; + } + + public Set getOutputs() { + return outputs; + } + + public Set getVisitedReduceSinks() { + return visitedReduceSinks; + } + + public Deque> getRootOperators() { + return rootOperators; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 3840318..e740fdb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -19,8 +19,10 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -48,18 +51,19 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; +import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism; +import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TaskCompiler; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; -import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.TaskCompiler; /** * SparkCompiler translates the operator plan into SparkTasks. @@ -87,9 +91,8 @@ public void init(HiveConf conf, LogHelper console, Hive db) { protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { // TODO: need to add spark specific optimization. -/* // Sequence of TableScan operators to be walked - Deque> deque = new LinkedList>(); + Deque> deque = new LinkedList>(); deque.addAll(pCtx.getTopOps().values()); // Create the context for the walker @@ -99,12 +102,13 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"), + opRules.put(new RuleRegExp("Set parallelism - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), - new SetReducerParallelism()); + new SetSparkReducerParallelism()); - opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), - JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); + // TODO: need to research and verify support convert join to map join optimization. + //opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), + // JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -113,7 +117,6 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new ForwardWalker(disp); ogw.startWalking(topNodes, null); -*/ } /**