From a0149dfc34c987135969b2d240cc21dd0afecd70 Mon Sep 17 00:00:00 2001 From: Na Yang Date: Tue, 12 Aug 2014 11:39:58 -0700 Subject: [PATCH] HIVE-7541: Support union all on Spark --- .../hadoop/hive/ql/exec/spark/ChainedTran.java | 43 ------- .../hadoop/hive/ql/exec/spark/GraphTran.java | 126 +++++++++++++++++++++ .../hadoop/hive/ql/exec/spark/SparkClient.java | 55 +++++---- .../hadoop/hive/ql/exec/spark/SparkPlan.java | 24 +--- .../hive/ql/exec/spark/SparkPlanGenerator.java | 112 ++++++++++++------ .../hadoop/hive/ql/exec/spark/UnionTran.java | 40 +++++++ .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 3 - 7 files changed, 284 insertions(+), 119 deletions(-) delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java deleted file mode 100644 index 4991568..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import java.util.List; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.api.java.JavaPairRDD; - -public class ChainedTran implements SparkTran { - private List trans; - - public ChainedTran(List trans) { - this.trans = trans; - } - - @Override - public JavaPairRDD transform( - JavaPairRDD input) { - JavaPairRDD result= input; - for (SparkTran tran : trans) { - result = tran.transform(result); - } - return result; - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java new file mode 100644 index 0000000..8a3422e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class GraphTran { + + private Set rootTrans = new HashSet(); + private Set leafTrans = new HashSet(); + private Map> transGraph = new HashMap>(); + private Map> invertedTransGraph = new HashMap>(); + private Map>> unionInputs = new HashMap>>(); + private Map> mapInputs = new HashMap>(); + + public void addTran(SparkTran tran) { + addTranWithInput(tran, null); + } + + public void addTranWithInput(SparkTran tran, + JavaPairRDD input) { + if (!rootTrans.contains(tran)) { + rootTrans.add(tran); + leafTrans.add(tran); + transGraph.put(tran, new LinkedList()); + invertedTransGraph.put(tran, new LinkedList()); + } + if (input != null) { + mapInputs.put(tran, input); + } + } + + public void execute() throws Exception { + JavaPairRDD resultRDD = null; + for (SparkTran tran : rootTrans) { + // make sure all the root trans are MapTran + if (!(tran instanceof MapTran)) { + throw new Exception("root transformations must be MapTran!"); + } + JavaPairRDD input = mapInputs.get(tran); + if (input == null) { + throw new Exception("input is missing for transformation!"); + } + JavaPairRDD rdd = tran.transform(input); + + while (getChildren(tran).size() > 0) { + SparkTran childTran = getChildren(tran).get(0); + if (childTran instanceof UnionTran) { + List> unionInputList = unionInputs + .get(childTran); + if (unionInputList == null) { + // process the first union input RDD, cache it in the hash map + unionInputList = new LinkedList>(); + unionInputList.add(rdd); + unionInputs.put(childTran, unionInputList); + break; + } else if (unionInputList.size() < this.getParents(childTran).size() - 1) { + // not the last input RDD yet, continue caching it in the hash map + unionInputList.add(rdd); + break; + } else if (unionInputList.size() == this.getParents(childTran).size() - 1) { // process + // process the last input RDD + for (JavaPairRDD inputRDD : unionInputList) { + ((UnionTran) childTran).setOtherInput(inputRDD); + rdd = childTran.transform(rdd); + } + } + } else { + rdd = childTran.transform(rdd); + } + tran = childTran; + } + resultRDD = rdd; + } + if (resultRDD != null) { + resultRDD.foreach(HiveVoidFunction.getInstance()); + } + } + + public void connect(SparkTran a, SparkTran b) { + transGraph.get(a).add(b); + invertedTransGraph.get(b).add(a); + rootTrans.remove(b); + leafTrans.remove(a); + } + + public List getParents(SparkTran tran) throws Exception { + if (!invertedTransGraph.containsKey(tran) + || invertedTransGraph.get(tran) == null) { + throw new Exception("Cannot get parent transformations for " + tran); + } + return new LinkedList(invertedTransGraph.get(tran)); + } + + public List getChildren(SparkTran tran) throws Exception { + if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) { + throw new Exception("Cannot get children transformations for " + tran); + } + return new LinkedList(transGraph.get(tran)); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 743717d..bfb34ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -42,7 +42,8 @@ public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; - protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); + protected static transient final Log LOG = LogFactory + .getLog(SparkClient.class); private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; private static final String SPARK_DEFAULT_MASTER = "local"; @@ -73,12 +74,14 @@ private SparkConf initiateSparkConf(Configuration hiveConf) { // set default spark configurations. sparkConf.set("spark.master", SPARK_DEFAULT_MASTER); sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.default.parallelism", "1"); + sparkConf.set("spark.serializer", + "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.default.parallelism", "1"); // load properties from spark-defaults.conf. InputStream inputStream = null; try { - inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE); + inputStream = this.getClass().getClassLoader() + .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); if (inputStream != null) { LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); Properties properties = new Properties(); @@ -87,13 +90,15 @@ private SparkConf initiateSparkConf(Configuration hiveConf) { if (propertyName.startsWith("spark")) { String value = properties.getProperty(propertyName); sparkConf.set(propertyName, properties.getProperty(propertyName)); - LOG.info(String.format("load spark configuration from %s (%s -> %s).", + LOG.info(String.format( + "load spark configuration from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, propertyName, value)); } } } } catch (IOException e) { - LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e); + LOG.info("Failed to open spark configuration file:" + + SPARK_DEFAULT_CONF_FILE, e); } finally { if (inputStream != null) { try { @@ -112,7 +117,8 @@ private SparkConf initiateSparkConf(Configuration hiveConf) { if (propertyName.startsWith("spark")) { String value = entry.getValue(); sparkConf.set(propertyName, value); - LOG.info(String.format("load spark configuration from hive configuration (%s -> %s).", + LOG.info(String.format( + "load spark configuration from hive configuration (%s -> %s).", propertyName, value)); } } @@ -122,7 +128,7 @@ private SparkConf initiateSparkConf(Configuration hiveConf) { public int execute(DriverContext driverContext, SparkWork sparkWork) { Context ctx = driverContext.getCtx(); - HiveConf hiveConf = (HiveConf)ctx.getConf(); + HiveConf hiveConf = (HiveConf) ctx.getConf(); refreshLocalResources(sparkWork, hiveConf); JobConf jobConf = new JobConf(hiveConf); @@ -138,7 +144,8 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { } // Generate Spark plan - SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); + SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, + emptyScratchDir); SparkPlan plan; try { plan = gen.generate(sparkWork); @@ -148,8 +155,12 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { } // Execute generated plan. - // TODO: we should catch any exception and return more meaningful error code. - plan.execute(); + try { + plan.execute(); + } catch (Exception e) { + LOG.error("Error executing Spark Plan", e); + return 1; + } return 0; } @@ -167,7 +178,8 @@ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { } // add added jars - String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + String addedJars = Utilities.getResourceFiles(conf, + SessionState.ResourceType.JAR); if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); addJars(addedJars); @@ -194,16 +206,20 @@ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { } } - //add added files - String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); - if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) { + // add added files + String addedFiles = Utilities.getResourceFiles(conf, + SessionState.ResourceType.FILE); + if (StringUtils.isNotEmpty(addedFiles) + && StringUtils.isNotBlank(addedFiles)) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); addResources(addedFiles); } // add added archives - String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); - if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) { + String addedArchives = Utilities.getResourceFiles(conf, + SessionState.ResourceType.ARCHIVE); + if (StringUtils.isNotEmpty(addedArchives) + && StringUtils.isNotBlank(addedArchives)) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); addResources(addedArchives); } @@ -211,7 +227,8 @@ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { private void addResources(String addedFiles) { for (String addedFile : addedFiles.split(",")) { - if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile) + if (StringUtils.isNotEmpty(addedFile) + && StringUtils.isNotBlank(addedFile) && !localFiles.contains(addedFile)) { localFiles.add(addedFile); sc.addFile(addedFile); @@ -222,7 +239,7 @@ private void addResources(String addedFiles) { private void addJars(String addedJars) { for (String addedJar : addedJars.split(",")) { if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar) - && !localJars.contains(addedJar)) { + && !localJars.contains(addedJar)) { localJars.add(addedJar); sc.addJar(addedJar); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b24f3d0..46e4b6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -18,31 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.api.java.JavaPairRDD; - public class SparkPlan { - private JavaPairRDD input; - private SparkTran tran; - public void execute() { - JavaPairRDD rdd = tran.transform(input); - rdd.foreach(HiveVoidFunction.getInstance()); - } + private GraphTran tran; - public SparkTran getTran() { - return tran; + public void execute() throws Exception { + tran.execute(); } - public void setTran(SparkTran tran) { + public void setTran(GraphTran tran) { this.tran = tran; } - public JavaPairRDD getInput() { - return input; - } - - public void setInput(JavaPairRDD input) { - this.input = input; + public GraphTran getTran() { + return tran; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 434d25f..345133d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.fs.Path; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.BytesWritable; @@ -50,8 +52,10 @@ private final JobConf jobConf; private Context context; private Path scratchDir; + private Map unionWorkTrans = new HashMap(); - public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) { + public SparkPlanGenerator(JavaSparkContext sc, Context context, + JobConf jobConf, Path scratchDir) { this.sc = sc; this.context = context; this.jobConf = jobConf; @@ -60,37 +64,63 @@ public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, public SparkPlan generate(SparkWork sparkWork) throws Exception { SparkPlan plan = new SparkPlan(); - List trans = new ArrayList(); + GraphTran trans = new GraphTran(); Set roots = sparkWork.getRoots(); - assert(roots != null && roots.size() == 1); - BaseWork w = roots.iterator().next(); - MapWork mapWork = (MapWork) w; - trans.add(generate(w)); - while (sparkWork.getChildren(w).size() > 0) { - ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0); - SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); - SparkShuffler st = generate(edge); - ReduceTran rt = generate(child); - rt.setShuffler(st); - rt.setNumPartitions(edge.getNumPartitions()); - trans.add(rt); - w = child; + for (BaseWork w : roots) { + if (!(w instanceof MapWork)) { + throw new Exception( + "The roots in the SparkWork must be MapWork instances!"); + } + MapWork mapWork = (MapWork) w; + SparkTran tran = generate(w); + JavaPairRDD input = generateRDD(mapWork); + trans.addTranWithInput(tran, input); + + while (sparkWork.getChildren(w).size() > 0) { + BaseWork child = sparkWork.getChildren(w).get(0); + if (child instanceof ReduceWork) { + SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); + SparkShuffler st = generate(edge); + ReduceTran rt = generate((ReduceWork) child); + rt.setShuffler(st); + rt.setNumPartitions(edge.getNumPartitions()); + trans.addTran(rt); + trans.connect(tran, rt); + w = child; + tran = rt; + } else if (child instanceof UnionWork) { + if (unionWorkTrans.get(child) != null) { + trans.connect(tran, unionWorkTrans.get(child)); + break; + } else { + SparkTran ut = generate((UnionWork) child); + unionWorkTrans.put(child, ut); + trans.addTran(ut); + trans.connect(tran, ut); + w = child; + tran = ut; + } + } + } } - ChainedTran chainedTran = new ChainedTran(trans); - plan.setTran(chainedTran); - JavaPairRDD input = generateRDD(mapWork); - plan.setInput(input); + unionWorkTrans.clear(); + plan.setTran(trans); return plan; } - private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { - List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); - Utilities.setInputPaths(jobConf, inputPaths); + private JavaPairRDD generateRDD(MapWork mapWork) + throws Exception { + JobConf newJobConf = new JobConf(jobConf); + List inputPaths = Utilities.getInputPaths(newJobConf, mapWork, + scratchDir, context, false); + Utilities.setInputPaths(newJobConf, inputPaths); + Utilities.setMapWork(newJobConf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. - jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + newJobConf.set("mapred.mapper.class", ExecMapper.class.getName()); + return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class, + Writable.class); } private SparkTran generate(BaseWork bw) throws IOException, HiveException { @@ -100,35 +130,40 @@ private SparkTran generate(BaseWork bw) throws IOException, HiveException { StatsFactory factory = StatsFactory.newFactory(jobConf); if (factory != null) { statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(jobConf)) { // creating stats table if not exists - if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { - throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); + if (!statsPublisher.init(jobConf)) { // creating stats table if not + // exists + if (HiveConf.getBoolVar(jobConf, + HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + throw new HiveException( + ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); } } } } if (bw instanceof MapWork) { - return generate((MapWork)bw); + return generate((MapWork) bw); } else if (bw instanceof ReduceWork) { - return generate((ReduceWork)bw); + return generate((ReduceWork) bw); } else { - throw new IllegalArgumentException("Only MapWork and ReduceWork are expected"); + throw new IllegalArgumentException( + "Only MapWork and ReduceWork are expected"); } } private MapTran generate(MapWork mw) throws IOException { + JobConf newJobConf = new JobConf(jobConf); MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, false); - Utilities.createTmpDirs(jobConf, mw); - jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + Utilities.setMapWork(newJobConf, mw, scratchDir, true); + Utilities.createTmpDirs(newJobConf, mw); + newJobConf.set("mapred.mapper.class", ExecMapper.class.getName()); + byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); HiveMapFunction mapFunc = new HiveMapFunction(confBytes); result.setMapFunction(mapFunc); return result; } private SparkShuffler generate(SparkEdgeProperty edge) { - if (edge.isShuffleSort()){ + if (edge.isShuffleSort()) { return new SortByShuffler(); } return new GroupByShuffler(); @@ -148,4 +183,9 @@ private ReduceTran generate(ReduceWork rw) throws IOException { return result; } + private UnionTran generate(UnionWork uw) { + UnionTran result = new UnionTran(); + return result; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java new file mode 100644 index 0000000..5ec7d0f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class UnionTran implements SparkTran { + JavaPairRDD otherInput; + + @Override + public JavaPairRDD transform( + JavaPairRDD input) { + return input.union(otherInput); + } + + public void setOtherInput(JavaPairRDD otherInput) { + this.otherInput = otherInput; + } + + public JavaPairRDD getOtherInput() { + return this.otherInput; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e4cb6f3..482bd13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -230,9 +230,6 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context } linked = context.linkedFileSinks.get(path); linked.add(desc); - - desc.setDirName(new Path(path, ""+linked.size())); - desc.setLinkedFileSinkDesc(linked); } if (current instanceof UnionOperator) { -- 1.8.5.2 (Apple Git-48)