diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index d9c546a..37bb4a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -947,6 +947,8 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc * A FileSinkOperator is added after parent to output the data. * Before child, we add a TableScanOperator to load data stored in the temporary * file back. + * + * TODO: for hive on spark, we changed it to public, but ideally we should change it back to protected. * @param parent * @param child * @param taskTmpDir @@ -954,7 +956,7 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc * @param parseCtx * @return The TableScanOperator inserted before child. */ - protected static TableScanOperator createTemporaryFile( + public static TableScanOperator createTemporaryFile( Operator parent, Operator child, Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 5ddc16d..58ab38d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -19,23 +19,11 @@ package org.apache.hadoop.hive.ql.parse.spark; import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -47,12 +35,11 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; -import org.apache.hadoop.hive.ql.plan.SparkWork; /** - * GenSparkProcContext maintains information about the tasks and operators + * GenSparkProcContext maintains information about the tasks and operators * as we walk the operator tree to break them into SparkTasks. - * + * * Cloned from GenTezProcContext. * */ @@ -85,6 +72,12 @@ // one. public BaseWork preceedingWork; + // All parallel RSs that can be optimized for multi-insertion + public final Set parallelReduceSinks; + + // A map from operator to the task that it belongs to + public final Map, SparkTask> opToTaskTable; + // map that keeps track of the last operator of a task to the work // that follows it. This is used for connecting them later. public final Map, BaseWork> leafOperatorToFollowingWork; @@ -142,8 +135,7 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.rootTasks = rootTasks; this.inputs = inputs; this.outputs = outputs; - this.currentTask = (SparkTask) TaskFactory.get( - new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + this.currentTask = null; this.leafOperatorToFollowingWork = new LinkedHashMap, BaseWork>(); this.linkOpWithWorkMap = new LinkedHashMap, Map>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap>(); @@ -153,8 +145,9 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.mapJoinParentMap = new LinkedHashMap>>(); this.currentMapJoinOperators = new LinkedHashSet(); this.linkChildOpWithDummyOp = new LinkedHashMap, List>>(); - this.dependencyTask = (DependencyCollectionTask) - TaskFactory.get(new DependencyCollectionWork(), conf); + //this.dependencyTask = (DependencyCollectionTask) + // TaskFactory.get(new DependencyCollectionWork(), conf); + this.dependencyTask = null; this.unionWorkMap = new LinkedHashMap, BaseWork>(); this.currentUnionOperators = new LinkedList(); this.workWithUnionOperators = new LinkedHashSet(); @@ -162,8 +155,8 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.linkedFileSinks = new LinkedHashMap>(); this.fileSinkSet = new LinkedHashSet(); this.connectedReduceSinks = new LinkedHashSet(); - - rootTasks.add(currentTask); + this.parallelReduceSinks = new HashSet(); + this.opToTaskTable = new HashMap, SparkTask>(); + // rootTasks.add(currentTask); } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 379a39c..79a0b65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -157,6 +158,14 @@ public MapWork createMapWork(GenSparkProcContext context, Operator root, return mapWork; } + public MapWork createMapWork(GenSparkProcContext context, TableScanOperator root, + SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException { + MapWork mapWork = new MapWork("Map " + (++sequenceNumber)); + GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc); + sparkWork.add(mapWork); + return mapWork; + } + // this method's main use is to help unit testing this class protected void setupMapWork(MapWork mapWork, GenSparkProcContext context, PrunedPartitionList partitions, Operator root, @@ -216,7 +225,7 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context FileSinkOperator fileSink = (FileSinkOperator)current; // remember it for additional processing later - context.fileSinkSet.add(fileSink); + // context.fileSinkSet.add(fileSink); FileSinkDesc desc = fileSink.getConf(); Path path = desc.getDirName(); @@ -269,15 +278,16 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi throws SemanticException { ParseContext parseContext = context.parseContext; + SparkTask currTask = context.opToTaskTable.get(fileSink); boolean isInsertTable = // is INSERT OVERWRITE TABLE GenMapRedUtils.isInsertInto(parseContext, fileSink); HiveConf hconf = parseContext.getConf(); boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, - hconf, fileSink, context.currentTask, isInsertTable); + hconf, fileSink, currTask, isInsertTable); - Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, + Path finalName = GenMapRedUtils.createMoveTask(currTask, chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); if (chDir) { @@ -286,13 +296,13 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi logger.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, currTask); } FetchTask fetchTask = parseContext.getFetchTask(); - if (fetchTask != null && context.currentTask.getNumChild() == 0) { + if (fetchTask != null && currTask.getNumChild() == 0) { if (fetchTask.isFetchFrom(fileSink.getConf())) { - context.currentTask.setFetchSource(true); + currTask.setFetchSource(true); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 864965e..518fcd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -81,6 +81,7 @@ public Object process(Node nd, Stack stack, // Operator is a file sink or reduce sink. Something that forces // a new vertex. Operator operator = (Operator) nd; + context.opToTaskTable.put(operator, context.currentTask); // root is the start of the operator pipeline we're currently // packing into a vertex, typically a table scan, union or join diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index dc621cf..d75b9b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -32,13 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -71,9 +65,9 @@ /** * SparkCompiler translates the operator plan into SparkTasks. - * + * * Pretty much cloned from TezCompiler. - * + * * TODO: need to complete and make it fit to Spark. */ public class SparkCompiler extends TaskCompiler { @@ -85,7 +79,7 @@ public SparkCompiler() { @Override public void init(HiveConf conf, LogHelper console, Hive db) { super.init(conf, console, db); - + // TODO: Need to check if we require the use of recursive input dirs for union processing // conf.setBoolean("mapred.input.dir.recursive", true); // HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true); @@ -138,11 +132,25 @@ protected void generateTaskTree(List> rootTasks, Pa GenSparkProcContext procCtx = new GenSparkProcContext( conf, tempParseContext, mvTask, rootTasks, inputs, outputs); + NodeProcessor setCurrTaskProcessor = new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + Map, SparkTask> opToTaskTable = context.opToTaskTable; + SparkTask task = opToTaskTable.get(stack.get(stack.size()-2)); + opToTaskTable.put((Operator) nd, task); + context.currentTask = task; + return null; + } + }; + // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Split Work - ReduceSink", - ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); + ReduceSinkOperator.getOperatorName() + "%"), + new CompositeProcessor(genSparkWork, new SparkMultiInsertionProcessor(GenSparkUtils.getUtils()))); // opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin", // MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc()); @@ -153,7 +161,8 @@ protected void generateTaskTree(List> rootTasks, Pa opRules.put(new RuleRegExp("Handle Analyze Command", TableScanOperator.getOperatorName() + "%"), - new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); + new CompositeProcessor(new SparkTableScanProcessor(), + new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()))); opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { @@ -162,7 +171,26 @@ public Object process(Node n, Stack s, NodeProcessorCtx procCtx, Object... os) throws SemanticException { GenSparkProcContext context = (GenSparkProcContext) procCtx; UnionOperator union = (UnionOperator) n; + Map, SparkTask> opToTaskTable = context.opToTaskTable; + // Check if we've created a task for this operator, if so, merge task. + if (opToTaskTable.containsKey(union)) { + SparkTask existingTask = opToTaskTable.get(union); + // Update the existing operators on the stack + for (Node nd : s) { + opToTaskTable.put((Operator)nd, existingTask); + } + opToTaskTable.put(union, existingTask); + // Also, remove the old task from the root task set if possible + if (context.rootTasks.contains(context.currentTask)) { + context.rootTasks.remove(context.currentTask); + } + context.currentTask = existingTask; + } else { + SparkTask task = opToTaskTable.get(s.get(s.size()-2)); + opToTaskTable.put((Operator)n, task); + context.currentTask = task; + } // simply need to remember that we've seen a union. context.currentUnionOperators.add(union); return null; @@ -171,7 +199,7 @@ public Object process(Node n, Stack s, // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + Dispatcher disp = new DefaultRuleDispatcher(setCurrTaskProcessor, opRules, procCtx); List topNodes = new ArrayList(); topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java new file mode 100644 index 0000000..0624eca --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import com.clearspring.analytics.util.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.*; + +import java.util.LinkedList; +import java.util.List; +import java.util.Stack; + +public class SparkMultiInsertionProcessor implements NodeProcessor { + private GenSparkUtils utils; + public SparkMultiInsertionProcessor(GenSparkUtils utils) { + this.utils = utils; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + ReduceSinkOperator op = (ReduceSinkOperator) nd; + GenSparkProcContext context = (GenSparkProcContext) procCtx; + if (context.parallelReduceSinks.contains(op)) { + splitPlan(op, context); + context.parallelReduceSinks.remove(op); + } + + return null; + } + + /** + * Met cRS in pOP(parentTask with RS)-cRS-cOP(noTask) case + * Create new child task for cRS-cOP and link two tasks by temporary file : pOP-FS / TS-cRS-cOP + * + * @param cRS + * the reduce sink operator encountered + * @param opProcCtx + * processing context + */ + private void splitPlan(ReduceSinkOperator cRS, GenSparkProcContext opProcCtx) + throws SemanticException { + // Generate a new task + ParseContext parseCtx = opProcCtx.parseContext; + SparkTask parentTask = opProcCtx.currentTask; + + SparkWork childPlan = new SparkWork(opProcCtx.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + SparkTask childTask = (SparkTask) TaskFactory.get(childPlan, parseCtx.getConf()); + Operator reducer = cRS.getChildOperators().get(0); + opProcCtx.opToTaskTable.put(reducer, childTask); + splitTasks(cRS, parentTask, childTask, opProcCtx); + } + + @SuppressWarnings("nls") + /** + * Split two tasks by creating a temporary file between them. + * + * @param op reduce sink operator being processed + * @param parentTask the parent task + * @param childTask the child task + * @param opProcCtx context + **/ + private void splitTasks(ReduceSinkOperator op, + SparkTask parentTask, SparkTask childTask, + GenSparkProcContext context) throws SemanticException { + Preconditions.checkArgument(op.getNumParent() == 1, + "AssertionError: expecting operator " + op + " to have only one parent," + + " but found multiple parents : " + op.getParentOperators()); + Preconditions.checkArgument(op.getNumChild() == 1, + "AssertionError: expecting operator " + op + " to have only one child," + + " but found multiple children : " + op.getChildOperators()); + + ParseContext parseCtx = context.parseContext; + parentTask.addDependentTask(childTask); + + // Generate the temporary file name + Context baseCtx = parseCtx.getContext(); + Path taskTmpDir = baseCtx.getMRTmpPath(); + + Operator parent = op.getParentOperators().get(0); + TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); + + // Create the temporary file, its corresponding FileSinkOperaoGentr, and + // its corresponding TableScanOperator. + TableScanOperator tableScan = + GenMapRedUtils.createTemporaryFile(parent, op, taskTmpDir, tt_desc, parseCtx); + + context.currentRootOperator = op.getChildOperators().get(0); + context.parentOfRoot = op; + context.currentTask = childTask; + String streamDesc = taskTmpDir.toUri().toString(); + context.opToTaskTable.put(tableScan, childTask); + context.opToTaskTable.put(op, childTask); + MapWork mapWork = utils.createMapWork(context, tableScan, + childTask.getWork(), streamDesc, tt_desc); + context.rootToWorkMap.put(tableScan, mapWork); + context.preceedingWork = mapWork; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java new file mode 100644 index 0000000..8003f01 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import java.util.HashSet; +import java.util.Set; +import java.util.Stack; + +public class SparkTableScanProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + TableScanOperator tblScan = (TableScanOperator) nd; + Set reduceSinks = new HashSet(); + + // First, set up the current task + SparkTask task = (SparkTask) TaskFactory.get( + new SparkWork(context.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), context.conf); + context.opToTaskTable.put(tblScan, task); + + // This TableScanOperator is not created by us, therefore it must + // have no dependency, therefore the task should be a rootTask. + context.rootTasks.add(task); + context.currentTask = task; + + // Second, for multi-insertion, do a DFS and collect first RS seen on each path. + Stack> path = new Stack>(); + path.push(tblScan); + while (!path.isEmpty()) { + Operator op = path.pop(); + if (op instanceof UnionOperator) { + return null; + } + if (op instanceof ReduceSinkOperator) { + reduceSinks.add((ReduceSinkOperator) op); + } + else { + for (Operator childOp : op.getChildOperators()) { + path.push(childOp); + } + } + } + + if (reduceSinks.size() > 1) { + context.parallelReduceSinks.addAll(reduceSinks); + } + return null; + } +}