Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -52,10 +52,10 @@ import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -100,6 +100,7 @@ public class FSPaths implements Cloneable { Path tmpPath; + Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; @@ -110,6 +111,7 @@ public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); + taskOutputTempPath = Utilities.toTaskTempPath(specPath); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; @@ -129,6 +131,14 @@ /** * Update OutPath according to tmpPath. */ + public Path getTaskOutPath(String taskId) { + return getOutPath(taskId, this.taskOutputTempPath); + } + + + /** + * Update OutPath according to tmpPath. + */ public Path getOutPath(String taskId) { return getOutPath(taskId, this.tmpPath); } @@ -182,14 +192,17 @@ private void commit(FileSystem fs) throws HiveException { for (int idx = 0; idx < outPaths.length; ++idx) { try { + if (bDynParts && !fs.exists(finalPaths[idx].getParent())) { + fs.mkdirs(finalPaths[idx].getParent()); + } if (!fs.rename(outPaths[idx], finalPaths[idx])) { - throw new HiveException("Unable to rename output to: " - + finalPaths[idx]); + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx]); } updateProgress(); } catch (IOException e) { - throw new HiveException(e + "Unable to rename output to: " - + finalPaths[idx]); + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx], e); } } } @@ -425,7 +438,7 @@ if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); - fsp.outPaths[filesIdx] = fsp.getOutPath(taskId); + fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); } else { fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; @@ -616,6 +629,7 @@ } fsp2 = new FSPaths(specPath); fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir); + fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dpDir); createBucketFiles(fsp2); valToPaths.put(dpDir, fsp2); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -108,6 +108,7 @@ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } + @Override public boolean requireLock() { return true; } @@ -205,6 +206,44 @@ return false; } + protected void createTmpDirs() throws IOException { + // fix up outputs + Map> pa = work.getPathToAliases(); + if (pa != null) { + ArrayList> opList = new ArrayList>(); + + if (work.getReducer() != null) { + opList.add(work.getReducer()); + } + + for (List ls : pa.values()) { + for (String a : ls) { + opList.add(work.getAliasToWork().get(a)); + + while (!opList.isEmpty()) { + Operator op = opList.remove(0); + + if (op instanceof FileSinkOperator) { + FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); + String tempDir = fdesc.getDirName(); + + if (tempDir != null) { + Path tempPath = Utilities.toTempPath(new Path(tempDir)); + LOG.info("Making Temp Directory: " + tempDir); + FileSystem fs = tempPath.getFileSystem(job); + fs.mkdirs(tempPath); + } + } + + if (op.getChildOperators() != null) { + opList.addAll(op.getChildOperators()); + } + } + } + } + } + } + /** * Execute a query plan using Hadoop. */ @@ -405,6 +444,8 @@ } } + this.createTmpDirs(); + // Finally SUBMIT the JOB! rj = jc.submitJob(job); // replace it back Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -117,8 +117,8 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -133,8 +133,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileOutputFormat; @@ -1012,7 +1012,19 @@ } private static final String tmpPrefix = "_tmp."; + private static final String taskTmpPrefix = "_task_tmp."; + public static Path toTaskTempPath(Path orig) { + if (orig.getName().indexOf(taskTmpPrefix) == 0) { + return orig; + } + return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); + } + + public static Path toTaskTempPath(String orig) { + return toTaskTempPath(new Path(orig)); + } + public static Path toTempPath(Path orig) { if (orig.getName().indexOf(tmpPrefix) == 0) { return orig; @@ -1211,6 +1223,7 @@ FileSystem fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); + Path taskTmpPath = Utilities.toTaskTempPath(specPath); Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate"); Path finalPath = new Path(specPath); @@ -1236,6 +1249,7 @@ } else { fs.delete(tmpPath, true); } + fs.delete(taskTmpPath, true); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (working copy) @@ -77,9 +77,6 @@ Path outputPath = getWorkOutputPath(job); FileSystem fs = outputPath.getFileSystem(job); - if (!fs.exists(outputPath)) { - fs.mkdirs(outputPath); - } Path file = new Path(outputPath, name); CompressionCodec codec = null; if (getCompressOutput(job)) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (working copy) @@ -61,6 +61,7 @@ boolean hasDynamicPartitions = false; boolean tmpPathFixed = false; Path tmpPath; + Path taskTmpPath; Path dpPath; public final static Log LOG = LogFactory.getLog("RCFileMergeMapper"); @@ -68,6 +69,7 @@ public RCFileMergeMapper() { } + @Override public void configure(JobConf job) { jc = job; hasDynamicPartitions = HiveConf.getBoolVar(job, @@ -75,7 +77,9 @@ String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job) .toString(); - updatePaths(Utilities.toTempPath(specPath)); + Path tmpPath = Utilities.toTempPath(specPath); + Path taskTmpPath = Utilities.toTaskTempPath(specPath); + updatePaths(tmpPath, taskTmpPath); try { fs = (new Path(specPath)).getFileSystem(job); autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, @@ -86,11 +90,12 @@ } } - private void updatePaths(Path tmpPath) { + private void updatePaths(Path tmpPath, Path taskTmpPath) { String taskId = Utilities.getTaskId(jc); this.tmpPath = tmpPath; + this.taskTmpPath = taskTmpPath; finalPath = new Path(tmpPath, taskId); - outPath = new Path(tmpPath, Utilities.toTempPath(taskId)); + outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId)); } @Override @@ -163,9 +168,10 @@ * * @param inputPath * @throws HiveException + * @throws IOException */ private void fixTmpPath(Path inputPath) - throws HiveException { + throws HiveException, IOException { dpPath = inputPath; Path newPath = new Path("."); int inputDepth = inputPath.depth(); @@ -177,9 +183,16 @@ inputDepth--; inputPath = inputPath.getParent(); } - updatePaths(new Path(tmpPath, newPath)); + + Path newTmpPath = new Path(tmpPath, newPath); + Path newTaskTmpPath = new Path(taskTmpPath, newPath); + if (!fs.exists(newTmpPath)) { + fs.mkdirs(newTmpPath); + } + updatePaths(newTmpPath, newTaskTmpPath); } + @Override public void close() throws IOException { // close writer if (outWriter == null) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (revision 1148905) +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (working copy) @@ -75,6 +75,7 @@ jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } + @Override public boolean requireLock() { return true; } @@ -140,6 +141,17 @@ } String outputPath = this.work.getOutputDir(); + Path tempOutPath = Utilities.toTempPath(new Path(outputPath)); + try { + FileSystem fs = tempOutPath.getFileSystem(job); + if (!fs.exists(tempOutPath)) { + fs.mkdirs(tempOutPath); + } + } catch (IOException e) { + console.printError("Can't make path " + outputPath + " : " + e.getMessage()); + return 6; + } + RCFileBlockMergeOutputFormat.setMergeOutputPath(job, new Path(outputPath)); job.setOutputKeyClass(NullWritable.class);