Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1548974) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -129,34 +129,12 @@ * Update OutPath according to tmpPath. */ public Path getTaskOutPath(String taskId) { - return getOutPath(taskId, this.taskOutputTempPath); + return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId)); } - /** - * Update OutPath according to tmpPath. - */ - public Path getOutPath(String taskId) { - return getOutPath(taskId, this.tmpPath); - } - - /** - * Update OutPath according to tmpPath. - */ - public Path getOutPath(String taskId, Path tmp) { - return new Path(tmp, Utilities.toTempPath(taskId)); - } - - /** * Update the final paths according to tmpPath. */ - public Path getFinalPath(String taskId) { - return getFinalPath(taskId, this.tmpPath, null); - } - - /** - * Update the final paths according to tmpPath. - */ public Path getFinalPath(String taskId, Path tmpPath, String extension) { if (extension != null) { return new Path(tmpPath, taskId + extension); @@ -229,8 +207,6 @@ private static final long serialVersionUID = 1L; protected transient FileSystem fs; protected transient Serializer serializer; - protected transient BytesWritable commonKey = new BytesWritable(); - protected transient TableIdEnum tabIdEnum = null; protected transient LongWritable row_count; private transient boolean isNativeTable = true; @@ -255,28 +231,6 @@ private transient int timeOut; // JT timeout in msec. private transient long lastProgressReport = System.currentTimeMillis(); - /** - * TableIdEnum. - * - */ - public static enum TableIdEnum { - TABLE_ID_1_ROWCOUNT, - TABLE_ID_2_ROWCOUNT, - TABLE_ID_3_ROWCOUNT, - TABLE_ID_4_ROWCOUNT, - TABLE_ID_5_ROWCOUNT, - TABLE_ID_6_ROWCOUNT, - TABLE_ID_7_ROWCOUNT, - TABLE_ID_8_ROWCOUNT, - TABLE_ID_9_ROWCOUNT, - TABLE_ID_10_ROWCOUNT, - TABLE_ID_11_ROWCOUNT, - TABLE_ID_12_ROWCOUNT, - TABLE_ID_13_ROWCOUNT, - TABLE_ID_14_ROWCOUNT, - TABLE_ID_15_ROWCOUNT; - } - protected transient boolean autoDelete = false; protected transient JobConf jc; Class outputClass; @@ -356,14 +310,7 @@ prtner = (HivePartitioner) ReflectionUtils.newInstance( jc.getPartitionerClass(), null); } - int id = conf.getDestTableId(); - if ((id != 0) && (id <= TableIdEnum.values().length)) { - String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT"; - tabIdEnum = TableIdEnum.valueOf(enumName); - row_count = new LongWritable(); - statsMap.put(tabIdEnum, row_count); - } - + row_count = new LongWritable(); if (dpCtx != null) { dpSetup(); } @@ -478,7 +425,7 @@ taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); } if (isNativeTable) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); @@ -803,20 +750,6 @@ return FileUtils.makePartName(dpColNames, row); } - private String getPartitionSpec(Path path, int level) { - Stack st = new Stack(); - Path p = path; - for (int i = 0; i < level; ++i) { - st.push(p.getName()); - p = p.getParent(); - } - StringBuilder sb = new StringBuilder(); - while (!st.empty()) { - sb.append(st.pop()); - } - return sb.toString(); - } - @Override public void closeOp(boolean abort) throws HiveException { if (!bDynParts && !filesCreated) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1548974) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1623,27 +1623,20 @@ FileSystem fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); - Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() - + ".intermediate"); Path finalPath = new Path(specPath); if (success) { if (fs.exists(tmpPath)) { - // Step1: rename tmp output folder to intermediate path. After this - // point, updates from speculative tasks still writing to tmpPath - // will not appear in finalPath. - log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath); - Utilities.rename(fs, tmpPath, intermediatePath); - // Step2: remove any tmp file or double-committed output files + // remove any tmp file or double-committed output files ArrayList emptyBuckets = - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx); + Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); } - // Step3: move to the file destination - log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath); + // move to the file destination + log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath); + Utilities.renameOrMoveFiles(fs, tmpPath, finalPath); } } else { fs.delete(tmpPath, true);