Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1081293) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -100,8 +100,8 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde.Constants; @@ -110,8 +110,8 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileOutputFormat; @@ -176,8 +176,10 @@ MapredWork gWork = null; try { String jobID = getHiveJobID(job); + assert jobID != null; gWork = gWorkMap.get(jobID); + if (gWork == null) { String jtConf = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT); String path; @@ -187,6 +189,7 @@ } else { path = "HIVE_PLAN" + jobID; } + InputStream in = new FileInputStream(path); MapredWork ret = deserializeMapRedWork(in, job); gWork = ret; @@ -331,7 +334,8 @@ if (!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) { // Set up distributed cache DistributedCache.createSymlink(job); - String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID; + String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + getHiveJobID(job); + DistributedCache.addCacheFile(new URI(uriWithLink), job); // set replication of the plan file to a high number. we use the same @@ -342,7 +346,7 @@ // Cache the plan in this process w.initialize(); - gWorkMap.put(jobID, w); + gWorkMap.put(getHiveJobID(job), w); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -402,8 +406,8 @@ e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); e.writeObject(t); - }finally { - if(null != e){ + } finally { + if (null != e) { e.close(); } } @@ -1142,7 +1146,8 @@ } public static void mvFileToFinalPath(String specPath, Configuration hconf, - boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException, HiveException { + boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException, + HiveException { FileSystem fs = (new Path(specPath)).getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); @@ -1158,7 +1163,7 @@ Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files ArrayList emptyBuckets = - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf); @@ -1176,13 +1181,18 @@ /** * Check the existence of buckets according to bucket specification. Create empty buckets if * needed. - * @param specPath The final path where the dynamic partitions should be in. - * @param conf FileSinkDesc. - * @param dpCtx dynamic partition context. + * + * @param specPath + * The final path where the dynamic partitions should be in. + * @param conf + * FileSinkDesc. + * @param dpCtx + * dynamic partition context. * @throws HiveException * @throws IOException */ - private static void createEmptyBuckets(Configuration hconf, ArrayList paths, FileSinkDesc conf) + private static void createEmptyBuckets(Configuration hconf, ArrayList paths, + FileSinkDesc conf) throws HiveException, IOException { JobConf jc; @@ -1209,7 +1219,7 @@ throw new HiveException(e); } - for (String p: paths) { + for (String p : paths) { Path path = new Path(p); RecordWriter writer = HiveFileFormatUtils.getRecordWriter( jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path); @@ -1538,14 +1548,15 @@ .getInputFileFormatClass(); InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( inputFormatCls, jobConf); - if(inputFormatObj instanceof ContentSummaryInputFormat) { + if (inputFormatObj instanceof ContentSummaryInputFormat) { cs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, jobConf); } else { FileSystem fs = p.getFileSystem(ctx.getConf()); cs = fs.getContentSummary(p); } ctx.addCS(path, cs); - LOG.info("Cache Content Summary for " + path + " length: " + cs.getLength() + " file count: " + LOG.info("Cache Content Summary for " + path + " length: " + cs.getLength() + + " file count: " + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); } @@ -1561,7 +1572,7 @@ } public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx) - throws Exception { + throws Exception { ContentSummary cs = ctx.getCS(dirPath); if (cs != null) { LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: " @@ -1712,17 +1723,17 @@ } public static String generateTarURI(String baseURI, String filename) { - String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz"); return tmpFileURI; } public static String generateTarURI(Path baseURI, String filename) { - String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz"); return tmpFileURI; } public static String generateTarFileName(String name) { - String tmpFileURI = new String(name+".tar.gz"); + String tmpFileURI = new String(name + ".tar.gz"); return tmpFileURI; } @@ -1738,7 +1749,7 @@ } public static double showTime(long time) { - double result = (double) time / (double)1000; + double result = (double) time / (double) 1000; return result; }