diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 413caf5..5f80712 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -316,40 +316,13 @@ public Path getMRTmpPath() { nextPathId()); } - - /** - * Given a URI for mapreduce intermediate output, swizzle the - * it to point to the local file system. This can be called in - * case the caller decides to run in local mode (in which case - * all intermediate data can be stored locally) - * - * @param originalURI uri to localize - * @return localized path for map-red intermediate data - */ - public String localizeMRTmpFileURI(String originalURI) { - Path o = new Path(originalURI); - Path mrbase = getMRScratchDir(); - - URI relURI = mrbase.toUri().relativize(o.toUri()); - if (relURI.equals(o.toUri())) { - throw new RuntimeException - ("Invalid URI: " + originalURI + ", cannot relativize against" + - mrbase.toString()); - } - - return getLocalScratchDir(!explain) + Path.SEPARATOR + - relURI.getPath(); - } - - /** * Get a tmp path on local host to store intermediate data. * * @return next available tmp path on local fs */ - public String getLocalTmpFileURI() { - return getLocalScratchDir(true) + Path.SEPARATOR + LOCAL_PREFIX + - nextPathId(); + public Path getLocalTmpPath() { + return new Path(getLocalScratchDir(true), LOCAL_PREFIX + nextPathId()); } /** @@ -595,38 +568,6 @@ public Configuration getConf() { } /** - * Given a mapping from paths to objects, localize any MR tmp paths - * @param map mapping from paths to objects - */ - public void localizeKeys(Map map) { - for (Map.Entry entry: map.entrySet()) { - String path = entry.getKey(); - if (isMRTmpFileURI(path)) { - Object val = entry.getValue(); - map.remove(path); - map.put(localizeMRTmpFileURI(path), val); - } - } - } - - /** - * Given a list of paths, localize any MR tmp paths contained therein - * @param paths list of paths to be localized - */ - public void localizePaths(List paths) { - Iterator iter = paths.iterator(); - List toAdd = new ArrayList (); - while(iter.hasNext()) { - String path = iter.next(); - if (isMRTmpFileURI(path)) { - iter.remove(); - toAdd.add(localizeMRTmpFileURI(path)); - } - } - paths.addAll(toAdd); - } - - /** * @return the isHDFSCleanup */ public boolean isHDFSCleanup() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index a314ce7..50afe55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -816,10 +816,10 @@ public void jobCloseOp(Configuration hconf, boolean success) throws HiveException { try { if ((conf != null) && isNativeTable) { - String specPath = conf.getDirName().toString(); + Path specPath = conf.getDirName(); DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); if (conf.isLinkedFileSink() && (dpCtx != null)) { - specPath = conf.getParentDir().toString(); + specPath = conf.getParentDir(); } Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 3e17ae7..6e22b02 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -150,7 +150,7 @@ public void jobCloseOp(Configuration hconf, boolean success) if (conf.getHandleSkewJoin()) { try { for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); mvFileToFinalPath(specPath, hconf, success, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -165,7 +165,7 @@ public void jobCloseOp(Configuration hconf, boolean success) if (success) { // move up files for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); moveUpFiles(specPath, hconf, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -184,16 +184,15 @@ public void jobCloseOp(Configuration hconf, boolean success) super.jobCloseOp(hconf, success); } - private void moveUpFiles(String specPath, Configuration hconf, Log log) + private void moveUpFiles(Path specPath, Configuration hconf, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); - Path finalPath = new Path(specPath); + FileSystem fs = specPath.getFileSystem(hconf); - if (fs.exists(finalPath)) { - FileStatus[] taskOutputDirs = fs.listStatus(finalPath); + if (fs.exists(specPath)) { + FileStatus[] taskOutputDirs = fs.listStatus(specPath); if (taskOutputDirs != null) { for (FileStatus dir : taskOutputDirs) { - Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath); + Utilities.renameOrMoveFiles(fs, dir.getPath(), specPath); fs.delete(dir.getPath(), true); } } @@ -210,15 +209,13 @@ private void moveUpFiles(String specPath, Configuration hconf, Log log) * @throws IOException * @throws HiveException */ - private void mvFileToFinalPath(String specPath, Configuration hconf, + private void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate"); - Path finalPath = new Path(specPath); - ArrayList emptyBuckets = null; if (success) { if (fs.exists(tmpPath)) { // Step1: rename tmp output folder to intermediate path. After this @@ -229,8 +226,8 @@ private void mvFileToFinalPath(String specPath, Configuration hconf, // Step2: remove any tmp file or double-committed output files Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); // Step3: move to the file destination - log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath); + log.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); } } else { fs.delete(tmpPath, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java index 190cd93..3bf58f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java @@ -177,7 +177,7 @@ public void initiliaze(Configuration hconf) { void endGroup() throws IOException, HiveException { if (skewKeyInCurrentGroup) { - String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); + Path specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); RowContainer> bigKey = (RowContainer)joinOp.storage[currBigKeyTag]; Path outputPath = getOperatorOutputPath(specPath); FileSystem destFs = outputPath.getFileSystem(hconf); @@ -258,7 +258,7 @@ public void close(boolean abort) throws HiveException { } try { - String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); + Path specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); Path bigKeyPath = getOperatorOutputPath(specPath); FileSystem fs = bigKeyPath.getFileSystem(hconf); delete(bigKeyPath, fs); @@ -295,7 +295,7 @@ private void commit() throws IOException { continue; } - String specPath = conf.getBigKeysDirMap().get( + Path specPath = conf.getBigKeysDirMap().get( Byte.valueOf((byte) bigKeyTbl)); commitOutputPathToFinalPath(specPath, false); for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) { @@ -311,7 +311,7 @@ private void commit() throws IOException { } } - private void commitOutputPathToFinalPath(String specPath, + private void commitOutputPathToFinalPath(Path specPath, boolean ignoreNonExisting) throws IOException { Path outPath = getOperatorOutputPath(specPath); Path finalPath = getOperatorFinalPath(specPath); @@ -334,14 +334,12 @@ private void commitOutputPathToFinalPath(String specPath, } } - private Path getOperatorOutputPath(String specPath) throws IOException { - Path tmpPath = Utilities.toTempPath(specPath); - return new Path(tmpPath, Utilities.toTempPath(taskId)); + private Path getOperatorOutputPath(Path specPath) throws IOException { + return new Path(Utilities.toTempPath(specPath), Utilities.toTempPath(taskId)); } - private Path getOperatorFinalPath(String specPath) throws IOException { - Path tmpPath = Utilities.toTempPath(specPath); - return new Path(tmpPath, taskId); + private Path getOperatorFinalPath(Path specPath) throws IOException { + return new Path(Utilities.toTempPath(specPath), taskId); } public void setSkewJoinJobCounter(LongWritable skewjoinFollowupJobs) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9aba70f..b9b5b4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1424,10 +1424,6 @@ public static Path toTaskTempPath(Path orig) { return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); } - public static Path toTaskTempPath(String orig) { - return toTaskTempPath(new Path(orig)); - } - public static Path toTempPath(Path orig) { if (orig.getName().indexOf(tmpPrefix) == 0) { return orig; @@ -1686,15 +1682,14 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI } } - public static void mvFileToFinalPath(String specPath, Configuration hconf, + public static void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); - Path finalPath = new Path(specPath); if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files @@ -1706,8 +1701,8 @@ public static void mvFileToFinalPath(String specPath, Configuration hconf, } // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, tmpPath, finalPath); + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } } else { fs.delete(tmpPath, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index bce41cf..22e5777 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -739,7 +739,7 @@ public static void main(String[] args) throws IOException, HiveException { public static String generateCmdLine(HiveConf hconf, Context ctx) throws IOException { HiveConf tempConf = new HiveConf(); - Path hConfFilePath = new Path(ctx.getLocalTmpFileURI(), JOBCONF_FILENAME); + Path hConfFilePath = new Path(ctx.getLocalTmpPath(), JOBCONF_FILENAME); OutputStream out = null; Properties deltaP = hconf.getChangedProperties(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index a7e2253..8d1d52d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -169,7 +169,7 @@ public int execute(DriverContext driverContext) { String hiveConfArgs = generateCmdLine(conf, ctx); // write out the plan to a local file - Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml"); + Path planPath = new Path(ctx.getLocalTmpPath(), "plan.xml"); OutputStream out = FileSystem.getLocal(conf).create(planPath); MapredWork plan = getWork(); LOG.info("Generating plan file " + planPath.toString()); @@ -188,7 +188,7 @@ public int execute(DriverContext driverContext) { if (!files.isEmpty()) { cmdLine = cmdLine + " -files " + files; - workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath(); + workDir = ctx.getLocalTmpPath().toUri().getPath(); if (! (new File(workDir)).mkdir()) { throw new IOException ("Cannot create tmp working dir: " + workDir); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index fd0fcd7..2d2508d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -136,7 +136,7 @@ public int execute(DriverContext driverContext) { String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN); // write out the plan to a local file - Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml"); + Path planPath = new Path(ctx.getLocalTmpPath(), "plan.xml"); OutputStream out = FileSystem.getLocal(conf).create(planPath); MapredLocalWork plan = getWork(); LOG.info("Generating plan file " + planPath.toString()); @@ -157,7 +157,7 @@ public int execute(DriverContext driverContext) { if (!files.isEmpty()) { cmdLine = cmdLine + " -files " + files; - workDir = (new Path(ctx.getLocalTmpFileURI())).toUri().getPath(); + workDir = ctx.getLocalTmpPath().toUri().getPath(); if (!(new File(workDir)).mkdir()) { throw new IOException("Cannot create tmp working dir: " + workDir); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java index 2f23802..850e043 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java @@ -84,13 +84,12 @@ public void configure(JobConf job) { listBucketingDepth = HiveConf.getIntVar(job, HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH); - String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job) - .toString(); + Path specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); updatePaths(tmpPath, taskTmpPath); try { - fs = (new Path(specPath)).getFileSystem(job); + fs = specPath.getFileSystem(job); autoDelete = fs.deleteOnExit(outPath); } catch (IOException e) { this.exception = true; @@ -316,7 +315,7 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); - Utilities.mvFileToFinalPath(outputPath.toUri().toString(), job, success, LOG, dynPartCtx, null, + Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index d297f0a..2aed3bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -234,7 +234,7 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); - Utilities.mvFileToFinalPath(outputPath.toUri().toString(), job, success, LOG, dynPartCtx, null, + Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 7a4e2b3..db30729 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -123,21 +123,20 @@ public static void processSkewJoin(JoinOperator joinOp, Task child = children != null && children.size() == 1 ? children.get(0) : null; - String baseTmpDir = parseCtx.getContext().getMRTmpPath().toUri().toString(); + Path baseTmpDir = parseCtx.getContext().getMRTmpPath(); JoinDesc joinDescriptor = joinOp.getConf(); Map> joinValues = joinDescriptor.getExprs(); int numAliases = joinValues.size(); - Map bigKeysDirMap = new HashMap(); - Map> smallKeysDirMap = new HashMap>(); - Map skewJoinJobResultsDir = new HashMap(); + Map bigKeysDirMap = new HashMap(); + Map> smallKeysDirMap = new HashMap>(); + Map skewJoinJobResultsDir = new HashMap(); Byte[] tags = joinDescriptor.getTagOrder(); for (int i = 0; i < numAliases; i++) { Byte alias = tags[i]; - String bigKeysDir = getBigKeysDir(baseTmpDir, alias); - bigKeysDirMap.put(alias, bigKeysDir); - Map smallKeysMap = new HashMap(); + bigKeysDirMap.put(alias, getBigKeysDir(baseTmpDir, alias)); + Map smallKeysMap = new HashMap(); smallKeysDirMap.put(alias, smallKeysMap); for (Byte src2 : tags) { if (!src2.equals(alias)) { @@ -154,8 +153,8 @@ public static void processSkewJoin(JoinOperator joinOp, joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY)); - HashMap> bigKeysDirToTaskMap = - new HashMap>(); + HashMap> bigKeysDirToTaskMap = + new HashMap>(); List listWorks = new ArrayList(); List> listTasks = new ArrayList>(); MapredWork currPlan = (MapredWork) currTask.getWork(); @@ -272,13 +271,13 @@ public static void processSkewJoin(JoinOperator joinOp, ArrayList aliases = new ArrayList(); String alias = src.toString(); aliases.add(alias); - String bigKeyDirPath = bigKeysDirMap.get(src); - newPlan.getPathToAliases().put(bigKeyDirPath, aliases); + Path bigKeyDirPath = bigKeysDirMap.get(src); + newPlan.getPathToAliases().put(bigKeyDirPath.toString(), aliases); newPlan.getAliasToWork().put(alias, tblScan_op); PartitionDesc part = new PartitionDesc(tableDescList.get(src), null); - newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part); + newPlan.getPathToPartitionInfo().put(bigKeyDirPath.toString(), part); newPlan.getAliasToPartnInfo().put(alias, part); Operator reducer = clonePlan.getReduceWork().getReducer(); @@ -297,7 +296,7 @@ public static void processSkewJoin(JoinOperator joinOp, MapredLocalWork localPlan = new MapredLocalWork( new LinkedHashMap>(), new LinkedHashMap()); - Map smallTblDirs = smallKeysDirMap.get(src); + Map smallTblDirs = smallKeysDirMap.get(src); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -306,7 +305,7 @@ public static void processSkewJoin(JoinOperator joinOp, Byte small_alias = tags[j]; Operator tblScan_op2 = parentOps[j]; localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2); - Path tblDir = new Path(smallTblDirs.get(small_alias)); + Path tblDir = smallTblDirs.get(small_alias); localPlan.getAliasToFetchWork().put(small_alias.toString(), new FetchWork(tblDir, tableDescList.get(small_alias))); } @@ -393,20 +392,19 @@ public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) { private static String SMALLKEYS = "smallkeys"; private static String RESULTS = "results"; - static String getBigKeysDir(String baseDir, Byte srcTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + BIGKEYS - + UNDERLINE + srcTbl; + static Path getBigKeysDir(Path baseDir, Byte srcTbl) { + return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + UNDERLINE + srcTbl); } - static String getBigKeysSkewJoinResultDir(String baseDir, Byte srcTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + BIGKEYS - + UNDERLINE + RESULTS + UNDERLINE + srcTbl; + static Path getBigKeysSkewJoinResultDir(Path baseDir, Byte srcTbl) { + return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + RESULTS + UNDERLINE + srcTbl); } - static String getSmallKeysDir(String baseDir, Byte srcTblBigTbl, + static Path getSmallKeysDir(Path baseDir, Byte srcTblBigTbl, Byte srcTblSmallTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + SMALLKEYS - + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl; + return new Path(baseDir, skewJoinPrefix + UNDERLINE + SMALLKEYS + + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java index 419494b..83b8d6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java @@ -102,7 +102,7 @@ private void processCurrentTask(Task currTask, if (localwork != null) { // get the context info and set up the shared tmp URI Context ctx = physicalContext.getContext(); - Path tmpPath = Utilities.generateTmpPath(new Path(ctx.getLocalTmpFileURI()), currTask.getId()); + Path tmpPath = Utilities.generateTmpPath(ctx.getLocalTmpPath(), currTask.getId()); localwork.setTmpPath(tmpPath); mapredWork.getMapWork().setTmpHDFSPath(Utilities.generateTmpPath( ctx.getMRTmpPath(), currTask.getId())); @@ -167,15 +167,15 @@ private void processCurrentTask(Task currTask, // get bigKeysDirToTaskMap ConditionalResolverSkewJoinCtx context = (ConditionalResolverSkewJoinCtx) conditionalTask .getResolverCtx(); - HashMap> bigKeysDirToTaskMap = context + HashMap> bigKeysDirToTaskMap = context .getDirToTaskMap(); // to avoid concurrent modify the hashmap - HashMap> newbigKeysDirToTaskMap = new HashMap>(); + HashMap> newbigKeysDirToTaskMap = new HashMap>(); // reset the resolver - for (Map.Entry> entry : bigKeysDirToTaskMap + for (Map.Entry> entry : bigKeysDirToTaskMap .entrySet()) { Task task = entry.getValue(); - String key = entry.getKey(); + Path key = entry.getKey(); if (task.equals(currTask)) { newbigKeysDirToTaskMap.put(key, localTask); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 81426cd..0e2d555 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -267,51 +267,51 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { analyzeDropIndex(ast); break; case HiveParser.TOK_DESCTABLE: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeDescribeTable(ast); break; case HiveParser.TOK_SHOWDATABASES: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowDatabases(ast); break; case HiveParser.TOK_SHOWTABLES: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowTables(ast); break; case HiveParser.TOK_SHOWCOLUMNS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowColumns(ast); break; case HiveParser.TOK_SHOW_TABLESTATUS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowTableStatus(ast); break; case HiveParser.TOK_SHOW_TBLPROPERTIES: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowTableProperties(ast); break; case HiveParser.TOK_SHOWFUNCTIONS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowFunctions(ast); break; case HiveParser.TOK_SHOWLOCKS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowLocks(ast); break; case HiveParser.TOK_SHOWDBLOCKS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowDbLocks(ast); break; case HiveParser.TOK_DESCFUNCTION: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeDescFunction(ast); break; case HiveParser.TOK_DESCDATABASE: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeDescDatabase(ast); break; case HiveParser.TOK_MSCK: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeMetastoreCheck(ast); break; case HiveParser.TOK_DROPVIEW: @@ -381,15 +381,15 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { analyzeAlterIndexProps(ast); break; case HiveParser.TOK_SHOWPARTITIONS: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowPartitions(ast); break; case HiveParser.TOK_SHOW_CREATETABLE: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowCreateTable(ast); break; case HiveParser.TOK_SHOWINDEXES: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowIndexes(ast); break; case HiveParser.TOK_LOCKTABLE: @@ -423,11 +423,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { analyzeDropRole(ast); break; case HiveParser.TOK_SHOW_ROLE_GRANT: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowRoleGrant(ast); break; case HiveParser.TOK_SHOW_ROLES: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowRoles(ast); break; case HiveParser.TOK_GRANT_ROLE: @@ -440,7 +440,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { analyzeGrant(ast); break; case HiveParser.TOK_SHOW_GRANT: - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowGrant(ast); break; case HiveParser.TOK_REVOKE: diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index eeee327..a9e43a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -65,7 +65,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { sem.analyze(input, ctx); sem.validate(); - ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + ctx.setResFile(ctx.getLocalTmpPath()); List> tasks = sem.getRootTasks(); Task fetchTask = sem.getFetchTask(); if (tasks == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 7b2e2e6..0d1dd10 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -87,8 +87,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { if (ts.tableHandle.isPartitioned()) { partitions = (ts.partitions != null) ? ts.partitions : db.getPartitions(ts.tableHandle); } - String tmpfile = ctx.getLocalTmpFileURI(); - Path path = new Path(tmpfile, "_metadata"); + Path path = new Path(ctx.getLocalTmpPath(), "_metadata"); EximUtil.createExportDump(FileSystem.getLocal(conf), path, ts.tableHandle, partitions); Task rTask = TaskFactory.get(new CopyWork( path, new Path(toURI), false), conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java index 184941f..9934fdf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java @@ -50,7 +50,7 @@ // tables into corresponding different dirs (one dir per table). // this map stores mapping from "big key dir" to its corresponding mapjoin // task. - private HashMap> dirToTaskMap; + private HashMap> dirToTaskMap; private Task noSkewTask; /** @@ -60,19 +60,19 @@ public ConditionalResolverSkewJoinCtx() { } public ConditionalResolverSkewJoinCtx( - HashMap> dirToTaskMap, + HashMap> dirToTaskMap, Task noSkewTask) { super(); this.dirToTaskMap = dirToTaskMap; this.noSkewTask = noSkewTask; } - public HashMap> getDirToTaskMap() { + public HashMap> getDirToTaskMap() { return dirToTaskMap; } public void setDirToTaskMap( - HashMap> dirToTaskMap) { + HashMap> dirToTaskMap) { this.dirToTaskMap = dirToTaskMap; } @@ -94,16 +94,14 @@ public ConditionalResolverSkewJoin() { ConditionalResolverSkewJoinCtx ctx = (ConditionalResolverSkewJoinCtx) objCtx; List> resTsks = new ArrayList>(); - Map> dirToTaskMap = ctx + Map> dirToTaskMap = ctx .getDirToTaskMap(); - Iterator>> bigKeysPathsIter = dirToTaskMap + Iterator>> bigKeysPathsIter = dirToTaskMap .entrySet().iterator(); try { while (bigKeysPathsIter.hasNext()) { - Entry> entry = bigKeysPathsIter - .next(); - String path = entry.getKey(); - Path dirPath = new Path(path); + Entry> entry = bigKeysPathsIter.next(); + Path dirPath = entry.getKey(); FileSystem inpFs = dirPath.getFileSystem(conf); FileStatus[] fstatus = Utilities.listStatusIfExists(dirPath, inpFs); if (fstatus != null && fstatus.length > 0) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java index 14fced7..4729d61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java @@ -29,6 +29,8 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.fs.Path; + /** * Map Join operator Descriptor implementation. * @@ -41,8 +43,8 @@ // used to handle skew join private boolean handleSkewJoin = false; private int skewKeyDefinition = -1; - private Map bigKeysDirMap; - private Map> smallKeysDirMap; + private Map bigKeysDirMap; + private Map> smallKeysDirMap; private Map skewKeysValuesTables; // alias to key mapping @@ -173,22 +175,22 @@ public void setSkewKeyDefinition(int skewKeyDefinition) { } @Override - public Map getBigKeysDirMap() { + public Map getBigKeysDirMap() { return bigKeysDirMap; } @Override - public void setBigKeysDirMap(Map bigKeysDirMap) { + public void setBigKeysDirMap(Map bigKeysDirMap) { this.bigKeysDirMap = bigKeysDirMap; } @Override - public Map> getSmallKeysDirMap() { + public Map> getSmallKeysDirMap() { return smallKeysDirMap; } @Override - public void setSmallKeysDirMap(Map> smallKeysDirMap) { + public void setSmallKeysDirMap(Map> smallKeysDirMap) { this.smallKeysDirMap = smallKeysDirMap; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index 4b9feac..2168811 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; + /** * Join operator Descriptor implementation. @@ -44,8 +46,8 @@ // used to handle skew join private boolean handleSkewJoin = false; private int skewKeyDefinition = -1; - private Map bigKeysDirMap; - private Map> smallKeysDirMap; + private Map bigKeysDirMap; + private Map> smallKeysDirMap; private Map skewKeysValuesTables; // alias to key mapping @@ -128,12 +130,12 @@ public Object clone() { } if (getBigKeysDirMap() != null) { - Map cloneBigKeysDirMap = new HashMap(); + Map cloneBigKeysDirMap = new HashMap(); cloneBigKeysDirMap.putAll(getBigKeysDirMap()); ret.setBigKeysDirMap(cloneBigKeysDirMap); } if (getSmallKeysDirMap() != null) { - Map> cloneSmallKeysDirMap = new HashMap> (); + Map> cloneSmallKeysDirMap = new HashMap> (); cloneSmallKeysDirMap.putAll(getSmallKeysDirMap()); ret.setSmallKeysDirMap(cloneSmallKeysDirMap); } @@ -364,7 +366,7 @@ public void setHandleSkewJoin(boolean handleSkewJoin) { /** * @return mapping from tbl to dir for big keys. */ - public Map getBigKeysDirMap() { + public Map getBigKeysDirMap() { return bigKeysDirMap; } @@ -373,14 +375,14 @@ public void setHandleSkewJoin(boolean handleSkewJoin) { * * @param bigKeysDirMap */ - public void setBigKeysDirMap(Map bigKeysDirMap) { + public void setBigKeysDirMap(Map bigKeysDirMap) { this.bigKeysDirMap = bigKeysDirMap; } /** * @return mapping from tbl to dir for small keys */ - public Map> getSmallKeysDirMap() { + public Map> getSmallKeysDirMap() { return smallKeysDirMap; } @@ -389,7 +391,7 @@ public void setBigKeysDirMap(Map bigKeysDirMap) { * * @param smallKeysDirMap */ - public void setSmallKeysDirMap(Map> smallKeysDirMap) { + public void setSmallKeysDirMap(Map> smallKeysDirMap) { this.smallKeysDirMap = smallKeysDirMap; }