diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index b0ab495..4d2bcfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.spark; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -131,6 +133,8 @@ public final List currentUnionOperators; public final Set workWithUnionOperators; + // we link filesink that will write to the same final location + public final Map> linkedFileSinks; public final Set fileSinkSet; public final Map> fileSinkMap; @@ -180,6 +184,7 @@ public GenSparkProcContext(HiveConf conf, this.unionWorkMap = new LinkedHashMap, BaseWork>(); this.currentUnionOperators = new LinkedList(); this.workWithUnionOperators = new LinkedHashSet(); + this.linkedFileSinks = new LinkedHashMap<>(); this.fileSinkSet = new LinkedHashSet(); this.fileSinkMap = new LinkedHashMap>(); this.pruningSinkSet = new LinkedHashSet>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 08602e2..e1fc74f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.spark; +import java.io.Serializable; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; @@ -31,6 +32,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; @@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; @@ -53,7 +57,9 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; @@ -269,6 +275,11 @@ public void removeUnionOperators(GenSparkProcContext context, BaseWork work) Iterator> it = newRoots.iterator(); for (Operator orig: roots) { + Set fsOpSet = OperatorUtils.findOperators(orig, FileSinkOperator.class); + for (FileSinkOperator fsOp : fsOpSet) { + context.fileSinkSet.remove(fsOp); + } + Operator newRoot = it.next(); if (newRoot instanceof HashTableDummyOperator) { dummyOps.add((HashTableDummyOperator) newRoot); @@ -290,6 +301,26 @@ public void removeUnionOperators(GenSparkProcContext context, BaseWork work) Operator current = operators.pop(); seen.add(current); + if (current instanceof FileSinkOperator) { + FileSinkOperator fileSink = (FileSinkOperator)current; + + // remember it for additional processing later + context.fileSinkSet.add(fileSink); + + FileSinkDesc desc = fileSink.getConf(); + Path path = desc.getDirName(); + List linked; + + if (!context.linkedFileSinks.containsKey(path)) { + linked = new ArrayList(); + context.linkedFileSinks.put(path, linked); + } + linked = context.linkedFileSinks.get(path); + linked.add(desc); + + desc.setLinkedFileSinkDesc(linked); + } + if (current instanceof UnionOperator) { Operator parent = null; int count = 0; @@ -344,7 +375,7 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } } - Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, + Path finalName = createMoveTask(context.currentTask, chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); if (chDir) { @@ -365,6 +396,48 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } /** + * Create and add any dependent move tasks. + * + * This is forked from {@link GenMapRedUtils}. The difference is that it doesn't check + * 'isLinkedFileSink' and does not set parent dir for the linked file sinks. + */ + public static Path createMoveTask(Task currTask, boolean chDir, + FileSinkOperator fsOp, ParseContext parseCtx, List> mvTasks, + HiveConf hconf, DependencyCollectionTask dependencyTask) { + + Path dest = null; + + if (chDir) { + dest = fsOp.getConf().getFinalDirName(); + + // generate the temporary file + // it must be on the same file system as the current destination + Context baseCtx = parseCtx.getContext(); + + Path tmpDir = baseCtx.getExternalTmpPath(dest); + + FileSinkDesc fileSinkDesc = fsOp.getConf(); + // Change all the linked file sink descriptors + for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { + fsConf.setDirName(tmpDir); + } + } + + Task mvTask = null; + + if (!chDir) { + mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp); + } + + // Set the move task to be dependent on the current task + if (mvTask != null) { + GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask); + } + + return dest; + } + + /** * Populate partition pruning information from the pruning sink operator to the * target MapWork (the MapWork for the big table side). The information include the source table * name, column name, and partition key expression. It also set up the temporary path used to