Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1134223) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -50,10 +50,10 @@ import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -98,6 +98,7 @@ public class FSPaths implements Cloneable { Path tmpPath; + Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; @@ -108,6 +109,7 @@ public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); + taskOutputTempPath = Utilities.toTaskTempPath(specPath); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; @@ -127,6 +129,14 @@ /** * Update OutPath according to tmpPath. */ + public Path getTaskOutPath(String taskId) { + return getOutPath(taskId, this.taskOutputTempPath); + } + + + /** + * Update OutPath according to tmpPath. + */ public Path getOutPath(String taskId) { return getOutPath(taskId, this.tmpPath); } @@ -180,14 +190,17 @@ private void commit(FileSystem fs) throws HiveException { for (int idx = 0; idx < outPaths.length; ++idx) { try { + if (bDynParts && !fs.exists(finalPaths[idx].getParent())) { + fs.mkdirs(finalPaths[idx].getParent()); + } if (!fs.rename(outPaths[idx], finalPaths[idx])) { - throw new HiveException("Unable to rename output to: " - + finalPaths[idx]); + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx]); } updateProgress(); } catch (IOException e) { - throw new HiveException(e + "Unable to rename output to: " - + finalPaths[idx]); + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx], e); } } } @@ -423,7 +436,7 @@ if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); - fsp.outPaths[filesIdx] = fsp.getOutPath(taskId); + fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); } else { fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; @@ -614,6 +627,7 @@ } fsp2 = new FSPaths(specPath); fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir); + fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dpDir); createBucketFiles(fsp2); valToPaths.put(dpDir, fsp2); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1134223) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -106,6 +106,7 @@ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } + @Override public boolean requireLock() { return true; } @@ -203,6 +204,44 @@ return false; } + protected void CreateTmpDirs() throws IOException { + // fix up outputs + Map> pa = work.getPathToAliases(); + if (pa != null) { + ArrayList> opList = new ArrayList>(); + + if (work.getReducer() != null) { + opList.add(work.getReducer()); + } + + for (List ls : pa.values()) { + for (String a : ls) { + opList.add(work.getAliasToWork().get(a)); + + while (!opList.isEmpty()) { + Operator op = opList.remove(0); + + if (op instanceof FileSinkOperator) { + FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); + String tempDir = fdesc.getDirName(); + + if (tempDir != null) { + Path tempPath = Utilities.toTempPath(new Path(tempDir)); + LOG.info("Making Temp Directory: " + tempDir); + FileSystem fs = tempPath.getFileSystem(job); + fs.mkdirs(tempPath); + } + } + + if (op.getChildOperators() != null) { + opList.addAll(op.getChildOperators()); + } + } + } + } + } + } + /** * Execute a query plan using Hadoop. */ @@ -403,6 +442,8 @@ } } + this.CreateTmpDirs(); + // Finally SUBMIT the JOB! rj = jc.submitJob(job); // replace it back Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1134223) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -117,8 +117,8 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -133,8 +133,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileOutputFormat; @@ -1012,7 +1012,19 @@ } private static final String tmpPrefix = "_tmp."; + private static final String taskTmpPrefix = "_task_tmp."; + public static Path toTaskTempPath(Path orig) { + if (orig.getName().indexOf(taskTmpPrefix) == 0) { + return orig; + } + return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); + } + + public static Path toTaskTempPath(String orig) { + return toTaskTempPath(new Path(orig)); + } + public static Path toTempPath(Path orig) { if (orig.getName().indexOf(tmpPrefix) == 0) { return orig; @@ -1211,6 +1223,7 @@ FileSystem fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); + Path taskTmpPath = Utilities.toTaskTempPath(specPath); Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate"); Path finalPath = new Path(specPath); @@ -1236,6 +1249,7 @@ } else { fs.delete(tmpPath, true); } + fs.delete(taskTmpPath, true); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (revision 1134223) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (working copy) @@ -77,9 +77,6 @@ Path outputPath = getWorkOutputPath(job); FileSystem fs = outputPath.getFileSystem(job); - if (!fs.exists(outputPath)) { - fs.mkdirs(outputPath); - } Path file = new Path(outputPath, name); CompressionCodec codec = null; if (getCompressOutput(job)) {