diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 762ce7d..d2c5245 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; @@ -220,6 +222,20 @@ private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); boolean caching = isCachingWork(work, sparkWork); if (work instanceof MapWork) { + // Create tmp dir for MergeFileWork + if (work instanceof MergeFileWork) { + Path outputPath = ((MergeFileWork) work).getOutputDir(); + Path tempOutPath = Utilities.toTempPath(outputPath); + FileSystem fs = outputPath.getFileSystem(jobConf); + try { + if (!fs.exists(tempOutPath)) { + fs.mkdirs(tempOutPath); + } + } catch (IOException e) { + throw new RuntimeException( + "Can't make path " + outputPath + " : " + e.getMessage()); + } + } MapTran mapTran = new MapTran(caching); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc);