diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index b0ab495..4d2bcfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.spark; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -131,6 +133,8 @@ public final List currentUnionOperators; public final Set workWithUnionOperators; + // we link filesink that will write to the same final location + public final Map> linkedFileSinks; public final Set fileSinkSet; public final Map> fileSinkMap; @@ -180,6 +184,7 @@ public GenSparkProcContext(HiveConf conf, this.unionWorkMap = new LinkedHashMap, BaseWork>(); this.currentUnionOperators = new LinkedList(); this.workWithUnionOperators = new LinkedHashSet(); + this.linkedFileSinks = new LinkedHashMap<>(); this.fileSinkSet = new LinkedHashSet(); this.fileSinkMap = new LinkedHashMap>(); this.pruningSinkSet = new LinkedHashSet>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 08602e2..18dd535 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.spark; +import java.io.Serializable; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; @@ -31,6 +32,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; @@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; @@ -53,7 +57,9 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; @@ -290,6 +296,26 @@ public void removeUnionOperators(GenSparkProcContext context, BaseWork work) Operator current = operators.pop(); seen.add(current); + if (current instanceof FileSinkOperator) { + FileSinkOperator fileSink = (FileSinkOperator)current; + + // remember it for additional processing later + context.fileSinkSet.add(fileSink); + + FileSinkDesc desc = fileSink.getConf(); + Path path = desc.getDirName(); + List linked; + + if (!context.linkedFileSinks.containsKey(path)) { + linked = new ArrayList(); + context.linkedFileSinks.put(path, linked); + } + linked = context.linkedFileSinks.get(path); + linked.add(desc); + + desc.setLinkedFileSinkDesc(linked); + } + if (current instanceof UnionOperator) { Operator parent = null; int count = 0; @@ -344,7 +370,7 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } } - Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, + Path finalName = createMoveTask(context.currentTask, chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); if (chDir) { @@ -365,6 +391,48 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } /** + * Create and add any dependent move tasks. + * + * This is forked from {@link GenMapRedUtils}. The difference is that it doesn't check + * 'isLinkedFileSink' and does not set parent dir for the linked file sinks. + */ + public static Path createMoveTask(Task currTask, boolean chDir, + FileSinkOperator fsOp, ParseContext parseCtx, List> mvTasks, + HiveConf hconf, DependencyCollectionTask dependencyTask) { + + Path dest = null; + + if (chDir) { + dest = fsOp.getConf().getFinalDirName(); + + // generate the temporary file + // it must be on the same file system as the current destination + Context baseCtx = parseCtx.getContext(); + + Path tmpDir = baseCtx.getExternalTmpPath(dest); + + FileSinkDesc fileSinkDesc = fsOp.getConf(); + // Change all the linked file sink descriptors + for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { + fsConf.setDirName(tmpDir); + } + } + + Task mvTask = null; + + if (!chDir) { + mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp); + } + + // Set the move task to be dependent on the current task + if (mvTask != null) { + GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask); + } + + return dest; + } + + /** * Populate partition pruning information from the pruning sink operator to the * target MapWork (the MapWork for the big table side). The information include the source table * name, column name, and partition key expression. It also set up the temporary path used to diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 73e596e..dba711b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -424,8 +424,7 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc()); opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", - FileSinkOperator.getOperatorName() + "%"), - new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork)); + FileSinkOperator.getOperatorName() + "%"), genSparkWork); opRules.put(new RuleRegExp("Handle Analyze Command", TableScanOperator.getOperatorName() + "%"), diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java deleted file mode 100644 index 4cc127a..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.parse.spark; - -import java.util.Stack; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.SemanticException; - -/** - * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks. - * Cloned from tez's FileSinkProcessor. - */ -public class SparkFileSinkProcessor implements NodeProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(SparkFileSinkProcessor.class.getName()); - - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the operator, - * we might as well do it here. - */ - @Override - public Object process(Node nd, Stack stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { - - GenSparkProcContext context = (GenSparkProcContext) procCtx; - FileSinkOperator fileSink = (FileSinkOperator) nd; - - // just remember it for later processing - context.fileSinkSet.add(fileSink); - return true; - } - -}