diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 7b861ed..e8b9fb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -311,9 +311,8 @@ public boolean isMRTmpFileURI(String uriStr) { * * @return next available path for map-red intermediate data */ - public String getMRTmpFileURI() { - return getMRScratchDir() + Path.SEPARATOR + MR_PREFIX + - nextPathId(); + public Path getMRTmpFileURI() { + return new Path(getMRScratchDir(), MR_PREFIX + nextPathId()); } @@ -359,9 +358,8 @@ public String getLocalTmpFileURI() { * external URI to which the tmp data has to be eventually moved * @return next available tmp path on the file system corresponding extURI */ - public String getExternalTmpFileURI(URI extURI) { - return getExternalScratchDir(extURI) + Path.SEPARATOR + EXT_PREFIX + - nextPathId(); + public Path getExternalTmpFileURI(URI extURI) { + return new Path(getExternalScratchDir(extURI), EXT_PREFIX + nextPathId()); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 328c14b..c7d6ed8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1538,9 +1538,7 @@ private int unarchive(Hive db, AlterTableSimpleDesc simpleDesc) throw new HiveException("Haven't found any archive where it should be"); } - Path tmpPath = new Path(driverContext - .getCtx() - .getExternalTmpFileURI(originalDir.toUri())); + Path tmpPath = driverContext.getCtx().getExternalTmpFileURI(originalDir.toUri()); try { fs = tmpPath.getFileSystem(conf); @@ -1548,11 +1546,6 @@ private int unarchive(Hive db, AlterTableSimpleDesc simpleDesc) throw new HiveException(e); } - // Some sanity checks - if (originalDir == null) { - throw new HiveException("Missing archive data in the partition"); - } - // Clarification of terms: // - The originalDir directory represents the original directory of the // partitions' files. They now contain an archived version of those files diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 8e0e9b7..50afe55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -253,13 +253,13 @@ private void initializeSpecPath() { // The movetask that follows subQ1 and subQ2 tasks still moves the directory // 'Parent' if ((!conf.isLinkedFileSink()) || (dpCtx == null)) { - specPath = new Path(conf.getDirName()); + specPath = conf.getDirName(); childSpecPathDynLinkedPartitions = null; return; } - specPath = new Path(conf.getParentDir()); - childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName()); + specPath = conf.getParentDir(); + childSpecPathDynLinkedPartitions = conf.getDirName().getName(); } @Override @@ -816,7 +816,7 @@ public void jobCloseOp(Configuration hconf, boolean success) throws HiveException { try { if ((conf != null) && isNativeTable) { - String specPath = conf.getDirName(); + Path specPath = conf.getDirName(); DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); if (conf.isLinkedFileSink() && (dpCtx != null)) { specPath = conf.getParentDir(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 3e17ae7..4350607 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -150,7 +150,7 @@ public void jobCloseOp(Configuration hconf, boolean success) if (conf.getHandleSkewJoin()) { try { for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); mvFileToFinalPath(specPath, hconf, success, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -165,7 +165,7 @@ public void jobCloseOp(Configuration hconf, boolean success) if (success) { // move up files for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); moveUpFiles(specPath, hconf, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -184,16 +184,14 @@ public void jobCloseOp(Configuration hconf, boolean success) super.jobCloseOp(hconf, success); } - private void moveUpFiles(String specPath, Configuration hconf, Log log) + private void moveUpFiles(Path specPath, Configuration hconf, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); - Path finalPath = new Path(specPath); - - if (fs.exists(finalPath)) { - FileStatus[] taskOutputDirs = fs.listStatus(finalPath); + FileSystem fs = specPath.getFileSystem(hconf); + if (fs.exists(specPath)) { + FileStatus[] taskOutputDirs = fs.listStatus(specPath); if (taskOutputDirs != null) { for (FileStatus dir : taskOutputDirs) { - Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath); + Utilities.renameOrMoveFiles(fs, dir.getPath(), specPath); fs.delete(dir.getPath(), true); } } @@ -210,15 +208,13 @@ private void moveUpFiles(String specPath, Configuration hconf, Log log) * @throws IOException * @throws HiveException */ - private void mvFileToFinalPath(String specPath, Configuration hconf, + private void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate"); - Path finalPath = new Path(specPath); - ArrayList emptyBuckets = null; if (success) { if (fs.exists(tmpPath)) { // Step1: rename tmp output folder to intermediate path. After this @@ -229,8 +225,8 @@ private void mvFileToFinalPath(String specPath, Configuration hconf, // Step2: remove any tmp file or double-committed output files Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); // Step3: move to the file destination - log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath); + log.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); } } else { fs.delete(tmpPath, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java index 190cd93..ab85ddd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java @@ -177,7 +177,7 @@ public void initiliaze(Configuration hconf) { void endGroup() throws IOException, HiveException { if (skewKeyInCurrentGroup) { - String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); + Path specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); RowContainer> bigKey = (RowContainer)joinOp.storage[currBigKeyTag]; Path outputPath = getOperatorOutputPath(specPath); FileSystem destFs = outputPath.getFileSystem(hconf); @@ -258,7 +258,7 @@ public void close(boolean abort) throws HiveException { } try { - String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); + Path specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); Path bigKeyPath = getOperatorOutputPath(specPath); FileSystem fs = bigKeyPath.getFileSystem(hconf); delete(bigKeyPath, fs); @@ -295,7 +295,7 @@ private void commit() throws IOException { continue; } - String specPath = conf.getBigKeysDirMap().get( + Path specPath = conf.getBigKeysDirMap().get( Byte.valueOf((byte) bigKeyTbl)); commitOutputPathToFinalPath(specPath, false); for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) { @@ -311,7 +311,7 @@ private void commit() throws IOException { } } - private void commitOutputPathToFinalPath(String specPath, + private void commitOutputPathToFinalPath(Path specPath, boolean ignoreNonExisting) throws IOException { Path outPath = getOperatorOutputPath(specPath); Path finalPath = getOperatorFinalPath(specPath); @@ -334,12 +334,12 @@ private void commitOutputPathToFinalPath(String specPath, } } - private Path getOperatorOutputPath(String specPath) throws IOException { + private Path getOperatorOutputPath(Path specPath) throws IOException { Path tmpPath = Utilities.toTempPath(specPath); return new Path(tmpPath, Utilities.toTempPath(taskId)); } - private Path getOperatorFinalPath(String specPath) throws IOException { + private Path getOperatorFinalPath(Path specPath) throws IOException { Path tmpPath = Utilities.toTempPath(specPath); return new Path(tmpPath, taskId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 7dc3d59..bd53a8b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -533,22 +533,22 @@ protected Expression instantiate(Object oldInstance, Encoder out) { } } - public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) { + public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { setMapWork(conf, w.getMapWork(), hiveScratchDir, true); if (w.getReduceWork() != null) { setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true); } } - public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) { + public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) { return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache); } - public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) { + public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) { return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -594,7 +594,7 @@ private static Path getPlanPath(Configuration conf, String name) { return new Path(planPath, name); } - private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException { + private static void setPlanPath(Configuration conf, Path hiveScratchDir) throws IOException { if (getPlanPath(conf) == null) { // this is the unique conf ID, which is kept in JobConf as part of the plan file name String jobID = UUID.randomUUID().toString(); @@ -1541,14 +1541,6 @@ private static String getIdFromFilename(String filename, Pattern pattern) { return taskId; } - public static String getFileNameFromDirName(String dirName) { - int dirEnd = dirName.lastIndexOf(Path.SEPARATOR); - if (dirEnd != -1) { - return dirName.substring(dirEnd + 1); - } - return dirName; - } - /** * Replace the task id from the filename. It is assumed that the filename is derived from the * output of getTaskId @@ -1650,15 +1642,14 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI } } - public static void mvFileToFinalPath(String specPath, Configuration hconf, + public static void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); - Path finalPath = new Path(specPath); if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files @@ -1670,8 +1661,8 @@ public static void mvFileToFinalPath(String specPath, Configuration hconf, } // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, tmpPath, finalPath); + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } } else { fs.delete(tmpPath, true); @@ -2263,7 +2254,7 @@ private static void getMRTasks(List> tasks, List getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx) + public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx) throws Exception { int sequenceNumber = 0; @@ -2912,7 +2902,7 @@ public static double getHighestSamplePercentage (MapWork work) { } @SuppressWarnings({"rawtypes", "unchecked"}) - private static Path createEmptyFile(String hiveScratchDir, + private static Path createEmptyFile(Path hiveScratchDir, Class outFileFormat, JobConf job, int sequenceNumber, Properties props, boolean dummyRow) throws IOException, InstantiationException, IllegalAccessException { @@ -2929,7 +2919,6 @@ private static Path createEmptyFile(String hiveScratchDir, String newFile = newDir + File.separator + "emptyFile"; Path newFilePath = new Path(newFile); - String onefile = newPath.toString(); FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, props, null); if (dummyRow) { @@ -2945,7 +2934,7 @@ private static Path createEmptyFile(String hiveScratchDir, @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work, - String hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir, String alias, int sequenceNumber) throws IOException, InstantiationException, IllegalAccessException { String strPath = path.toString(); @@ -2987,7 +2976,7 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, - String hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir, String alias, int sequenceNumber) throws IOException, InstantiationException, IllegalAccessException { TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); @@ -3055,8 +3044,8 @@ public static void setInputAttributes(Configuration conf, MapWork mWork) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat()); } if (mWork.getIndexIntermediateFile() != null) { - conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile()); - conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile()); + conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile().toString()); + conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile().toString()); } // Intentionally overwrites anything the user may have put here @@ -3115,10 +3104,10 @@ private static void createTmpDirs(Configuration conf, if (op instanceof FileSinkOperator) { FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); - String tempDir = fdesc.getDirName(); + Path tempDir = fdesc.getDirName(); if (tempDir != null) { - Path tempPath = Utilities.toTempPath(new Path(tempDir)); + Path tempPath = Utilities.toTempPath(tempDir); FileSystem fs = tempPath.getFileSystem(conf); fs.mkdirs(tempPath); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 42d764d..88c405b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -203,7 +203,6 @@ public int execute(DriverContext driverContext) { Context ctx = driverContext.getCtx(); boolean ctxCreated = false; - String emptyScratchDirStr; Path emptyScratchDir; MapWork mWork = work.getMapWork(); @@ -215,8 +214,7 @@ public int execute(DriverContext driverContext) { ctxCreated = true; } - emptyScratchDirStr = ctx.getMRTmpFileURI(); - emptyScratchDir = new Path(emptyScratchDirStr); + emptyScratchDir = ctx.getMRTmpFileURI(); FileSystem fs = emptyScratchDir.getFileSystem(job); fs.mkdirs(emptyScratchDir); } catch (IOException e) { @@ -332,7 +330,7 @@ public int execute(DriverContext driverContext) { if (localwork != null) { if (!ShimLoader.getHadoopShims().isLocalMode(job)) { Path localPath = new Path(localwork.getTmpFileURI()); - Path hdfsPath = new Path(mWork.getTmpHDFSFileURI()); + Path hdfsPath = mWork.getTmpHDFSFileURI(); FileSystem hdfs = hdfsPath.getFileSystem(job); FileSystem localFS = localPath.getFileSystem(job); @@ -370,7 +368,7 @@ public int execute(DriverContext driverContext) { } } work.configureJobConf(job); - List inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx); + List inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx); Utilities.setInputPaths(job, inputPaths); Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); @@ -503,7 +501,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H inputPaths.add(new Path(path)); } - String tmpPath = context.getCtx().getExternalTmpFileURI(inputPaths.get(0).toUri()); + Path tmpPath = context.getCtx().getExternalTmpFileURI(inputPaths.get(0).toUri()); Path partitionFile = new Path(tmpPath, ".partitions"); ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile); PartitionKeySampler sampler = new PartitionKeySampler(); @@ -555,8 +553,8 @@ protected void setInputAttributes(Configuration conf) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat()); } if (mWork.getIndexIntermediateFile() != null) { - conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile()); - conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile()); + conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile().toString()); + conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile().toString()); } // Intentionally overwrites anything the user may have put here diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java index 617723e..f0644e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -37,7 +38,7 @@ private HashSet additionalSemanticInputs; // additional inputs to add to the parse context when // merging the index query tasks private String indexInputFormat; // input format to set on the TableScanOperator to activate indexing - private String indexIntermediateFile; // name of intermediate file written by the index query for the + private Path indexIntermediateFile; // name of intermediate file written by the index query for the // TableScanOperator to use private List> queryTasks; // list of tasks that will execute the index query and write // results to a temporary file @@ -69,10 +70,10 @@ public void setIndexInputFormat(String indexInputFormat) { this.indexInputFormat = indexInputFormat; } - public String getIndexIntermediateFile() { + public Path getIndexIntermediateFile() { return indexIntermediateFile; } - public void setIndexIntermediateFile(String indexIntermediateFile) { + public void setIndexIntermediateFile(Path indexIntermediateFile) { this.indexIntermediateFile = indexIntermediateFile; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 34ced8b..dc5e3d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; @@ -97,7 +98,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, // Build reentrant QL for index query StringBuilder qlCommand = new StringBuilder("INSERT OVERWRITE DIRECTORY "); - String tmpFile = pctx.getContext().getMRTmpFileURI(); + Path tmpFile = pctx.getContext().getMRTmpFileURI(); qlCommand.append( "\"" + tmpFile + "\" "); // QL includes " around file name qlCommand.append("SELECT bucketname AS `_bucketname` , COLLECT_SET(offset) AS `_offsets` FROM "); qlCommand.append("(SELECT `_bucketname` AS bucketname , `_offset` AS offset FROM "); diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 57ebd1e..149dcc2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -170,7 +171,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, // Build reentrant QL for index query StringBuilder qlCommand = new StringBuilder("INSERT OVERWRITE DIRECTORY "); - String tmpFile = pctx.getContext().getMRTmpFileURI(); + Path tmpFile = pctx.getContext().getMRTmpFileURI(); queryContext.setIndexIntermediateFile(tmpFile); qlCommand.append( "\"" + tmpFile + "\" "); // QL includes " around file name qlCommand.append("SELECT `_bucketname` , `_offsets` FROM "); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java index 2f23802..e633431 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java @@ -316,7 +316,7 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); - Utilities.mvFileToFinalPath(outputPath.toUri().toString(), job, success, LOG, dynPartCtx, null, + Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index d297f0a..2aed3bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -234,7 +234,7 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); - Utilities.mvFileToFinalPath(outputPath.toUri().toString(), job, success, LOG, dynPartCtx, null, + Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java index 0da5790..7c61fa3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; @@ -92,13 +93,13 @@ public String getCurrAliasId() { */ public static class GenMRUnionCtx { final Task uTask; - List taskTmpDir; + List taskTmpDir; List tt_desc; List> listTopOperators; public GenMRUnionCtx(Task uTask) { this.uTask = uTask; - taskTmpDir = new ArrayList(); + taskTmpDir = new ArrayList(); tt_desc = new ArrayList(); listTopOperators = new ArrayList>(); } @@ -107,11 +108,11 @@ public GenMRUnionCtx(Task uTask) { return uTask; } - public void addTaskTmpDir(String taskTmpDir) { + public void addTaskTmpDir(Path taskTmpDir) { this.taskTmpDir.add(taskTmpDir); } - public List getTaskTmpDir() { + public List getTaskTmpDir() { return taskTmpDir; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java index c580818..69b72e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Stack; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -124,7 +125,7 @@ private void processSubQueryUnionCreateIntermediate( // generate the temporary file Context baseCtx = parseCtx.getContext(); - String taskTmpDir = baseCtx.getMRTmpFileURI(); + Path taskTmpDir = baseCtx.getMRTmpFileURI(); // Create the temporary file, its corresponding FileSinkOperaotr, and // its corresponding TableScanOperator. diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 1e89fcd..a06f06f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -204,7 +204,7 @@ private static void setUnionPlan(GenMRProcContext opProcCtx, currTopOp = null; opProcCtx.setCurrTopOp(currTopOp); } else { - List taskTmpDirLst = uCtx.getTaskTmpDir(); + List taskTmpDirLst = uCtx.getTaskTmpDir(); if ((taskTmpDirLst != null) && !(taskTmpDirLst.isEmpty())) { List tt_descLst = uCtx.getTTDesc(); assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty(); @@ -217,16 +217,16 @@ private static void setUnionPlan(GenMRProcContext opProcCtx, MapredWork plan = (MapredWork) currTask.getWork(); for (int pos = 0; pos < size; pos++) { - String taskTmpDir = taskTmpDirLst.get(pos); + Path taskTmpDir = taskTmpDirLst.get(pos); TableDesc tt_desc = tt_descLst.get(pos); MapWork mWork = plan.getMapWork(); if (mWork.getPathToAliases().get(taskTmpDir) == null) { - mWork.getPathToAliases().put(taskTmpDir, + mWork.getPathToAliases().put(taskTmpDir.toString(), new ArrayList()); - mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir); - mWork.getPathToPartitionInfo().put(taskTmpDir, + mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir.toString()); + mWork.getPathToPartitionInfo().put(taskTmpDir.toString(), new PartitionDesc(tt_desc, null)); - mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos)); + mWork.getAliasToWork().put(taskTmpDir.toString(), topOperators.get(pos)); } } } @@ -945,7 +945,7 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc */ protected static TableScanOperator createTemporaryFile( Operator parent, Operator child, - String taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) { + Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) { // Create a FileSinkOperator for the file name of taskTmpDir boolean compressIntermediate = @@ -1007,7 +1007,7 @@ private static void splitTasks(ReduceSinkOperator op, // Generate the temporary file name Context baseCtx = parseCtx.getContext(); - String taskTmpDir = baseCtx.getMRTmpFileURI(); + Path taskTmpDir = baseCtx.getMRTmpFileURI(); Operator parent = op.getParentOperators().get(0); TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils @@ -1022,7 +1022,7 @@ private static void splitTasks(ReduceSinkOperator op, opProcCtx.getMapCurrCtx(); mapCurrCtx.put(tableScanOp, new GenMapRedCtx(childTask, null)); - String streamDesc = taskTmpDir; + String streamDesc = taskTmpDir.toString(); MapredWork cplan = (MapredWork) childTask.getWork(); if (needsTagging(cplan.getReduceWork())) { @@ -1054,7 +1054,7 @@ private static void splitTasks(ReduceSinkOperator op, } // Add the path to alias mapping - setTaskPlan(taskTmpDir, streamDesc, tableScanOp, cplan.getMapWork(), false, tt_desc); + setTaskPlan(taskTmpDir.toString(), streamDesc, tableScanOp, cplan.getMapWork(), false, tt_desc); opProcCtx.setCurrTopOp(null); opProcCtx.setCurrAliasId(null); opProcCtx.setCurrTask(childTask); @@ -1194,7 +1194,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); - FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName.toUri().toString(), ts, + FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, conf.getBoolVar(ConfVars.COMPRESSRESULT)); FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( fsOutputDesc, inputRS, tsMerge); @@ -1239,7 +1239,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // 2. Constructing a conditional task consisting of a move task and a map reduce task // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(new Path(fsInputDesc.getFinalDirName()), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); MapWork cplan; Serializable work; @@ -1445,7 +1445,7 @@ private static MapWork createMRWorkForMergingFiles (HiveConf conf, Operator topOp, FileSinkDesc fsDesc) { ArrayList aliases = new ArrayList(); - String inputDir = fsDesc.getFinalDirName(); + String inputDir = fsDesc.getFinalDirName().toString(); TableDesc tblDesc = fsDesc.getTableInfo(); aliases.add(inputDir); // dummy alias: just use the input path @@ -1471,7 +1471,7 @@ private static MapWork createMRWorkForMergingFiles (HiveConf conf, public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, Path finalName, boolean hasDynamicPartitions) throws SemanticException { - String inputDir = fsInputDesc.getFinalDirName(); + Path inputDir = fsInputDesc.getFinalDirName(); TableDesc tblDesc = fsInputDesc.getTableInfo(); if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) { @@ -1479,22 +1479,22 @@ public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, ArrayList inputDirstr = new ArrayList(1); if (!hasDynamicPartitions && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { - inputDirs.add(new Path(inputDir)); - inputDirstr.add(inputDir); + inputDirs.add(inputDir); + inputDirstr.add(inputDir.toString()); } MergeWork work = new MergeWork(inputDirs, finalName, hasDynamicPartitions, fsInputDesc.getDynPartCtx()); LinkedHashMap> pathToAliases = new LinkedHashMap>(); - pathToAliases.put(inputDir, (ArrayList) inputDirstr.clone()); + pathToAliases.put(inputDir.toString(), (ArrayList) inputDirstr.clone()); work.setMapperCannotSpanPartns(true); work.setPathToAliases(pathToAliases); work.setAliasToWork( new LinkedHashMap>()); if (hasDynamicPartitions || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { - work.getPathToPartitionInfo().put(inputDir, + work.getPathToPartitionInfo().put(inputDir.toString(), new PartitionDesc(tblDesc, null)); } work.setListBucketingCtx(fsInputDesc.getLbCtx()); @@ -1523,7 +1523,7 @@ public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, @SuppressWarnings("unchecked") public static ConditionalTask createCondTask(HiveConf conf, Task currTask, MoveWork mvWork, - Serializable mergeWork, String inputPath) { + Serializable mergeWork, Path inputPath) { // There are 3 options for this ConditionalTask: // 1) Merge the partitions @@ -1591,7 +1591,7 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { } if ((srcDir != null) - && (srcDir.equals(new Path(fsOp.getConf().getFinalDirName())))) { + && (srcDir.equals(fsOp.getConf().getFinalDirName()))) { return mvTsk; } } @@ -1672,20 +1672,20 @@ public static Path createMoveTask(Task currTask, boolean Path dest = null; if (chDir) { - dest = new Path(fsOp.getConf().getFinalDirName()); + dest = fsOp.getConf().getFinalDirName(); // generate the temporary file // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); - String tmpDir = baseCtx.getExternalTmpFileURI(dest.toUri()); + Path tmpDir = baseCtx.getExternalTmpFileURI(dest.toUri()); FileSinkDesc fileSinkDesc = fsOp.getConf(); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { - String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName()); + String fileName = fsConf.getDirName().getName(); fsConf.setParentDir(tmpDir); - fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName); + fsConf.setDirName(new Path(tmpDir, fileName)); } } else { fileSinkDesc.setDirName(tmpDir); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java index 476af4b..084f9f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Stack; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -94,7 +95,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } GroupByOperator pGBY = (GroupByOperator) stack.get(stack.size() - 5); - String fileName = FS.getConf().getFinalDirName(); + Path fileName = FS.getConf().getFinalDirName(); TableDesc tsDesc = createIntermediateFS(pGBY, fileName); for (AggregationDesc aggregation : cGBY.getConf().getAggregators()) { @@ -112,7 +113,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } - private TableDesc createIntermediateFS(Operator parent, String fileName) { + private TableDesc createIntermediateFS(Operator parent, Path fileName) { TableDesc tsDesc = PlanUtils.getIntermediateFileTableDesc(PlanUtils .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java index 296fecb..b0be340 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -42,20 +43,20 @@ private Map, List> bucketedColsByOp; // A mapping from a directory which a FileSinkOperator writes into to the columns by which that // output is bucketed - private Map> bucketedColsByDirectory; + private Map> bucketedColsByDirectory; // A mapping from an operator to the columns by which it's output is sorted private Map, List> sortedColsByOp; // A mapping from a directory which a FileSinkOperator writes into to the columns by which that // output is sorted - private Map> sortedColsByDirectory; + private Map> sortedColsByDirectory; public BucketingSortingCtx(boolean disableBucketing) { this.disableBucketing = disableBucketing; this.bucketedColsByOp = new HashMap, List>(); - this.bucketedColsByDirectory = new HashMap>(); + this.bucketedColsByDirectory = new HashMap>(); this.sortedColsByOp = new HashMap, List>(); - this.sortedColsByDirectory = new HashMap>(); + this.sortedColsByDirectory = new HashMap>(); } @@ -70,12 +71,12 @@ public void setBucketedCols(Operator op, List } } - public Map> getBucketedColsByDirectory() { + public Map> getBucketedColsByDirectory() { return disableBucketing ? null : bucketedColsByDirectory; } - public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) { + public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) { if (!disableBucketing) { this.bucketedColsByDirectory = bucketedColsByDirectory; } @@ -91,12 +92,12 @@ public void setSortedCols(Operator op, List sor this.sortedColsByOp.put(op, sortedCols); } - public Map> getSortedColsByDirectory() { + public Map> getSortedColsByDirectory() { return sortedColsByDirectory; } - public void setSortedColsByDirectory(Map> sortedColsByDirectory) { + public void setSortedColsByDirectory(Map> sortedColsByDirectory) { this.sortedColsByDirectory = sortedColsByDirectory; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 0820743..359d8de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -246,7 +247,7 @@ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Conf } // The mapJoinTaskFileSinkOperator writes to a different directory - String childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName(); + Path childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName(); List childMRAliases = childMapWork.getPathToAliases().get(childMRPath); if (childMRAliases == null || childMRAliases.size() != 1) { return; @@ -258,7 +259,7 @@ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Conf String path = entry.getKey(); List aliases = entry.getValue(); - if (path.equals(childMRPath)) { + if (path.equals(childMRPath.toString())) { continue; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 4a775d9..ec17352 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -123,21 +123,21 @@ public static void processSkewJoin(JoinOperator joinOp, Task child = children != null && children.size() == 1 ? children.get(0) : null; - String baseTmpDir = parseCtx.getContext().getMRTmpFileURI(); + Path baseTmpDir = parseCtx.getContext().getMRTmpFileURI(); JoinDesc joinDescriptor = joinOp.getConf(); Map> joinValues = joinDescriptor.getExprs(); int numAliases = joinValues.size(); - Map bigKeysDirMap = new HashMap(); - Map> smallKeysDirMap = new HashMap>(); - Map skewJoinJobResultsDir = new HashMap(); + Map bigKeysDirMap = new HashMap(); + Map> smallKeysDirMap = new HashMap>(); + Map skewJoinJobResultsDir = new HashMap(); Byte[] tags = joinDescriptor.getTagOrder(); for (int i = 0; i < numAliases; i++) { Byte alias = tags[i]; - String bigKeysDir = getBigKeysDir(baseTmpDir, alias); + Path bigKeysDir = getBigKeysDir(baseTmpDir, alias); bigKeysDirMap.put(alias, bigKeysDir); - Map smallKeysMap = new HashMap(); + Map smallKeysMap = new HashMap(); smallKeysDirMap.put(alias, smallKeysMap); for (Byte src2 : tags) { if (!src2.equals(alias)) { @@ -154,8 +154,8 @@ public static void processSkewJoin(JoinOperator joinOp, joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY)); - HashMap> bigKeysDirToTaskMap = - new HashMap>(); + HashMap> bigKeysDirToTaskMap = + new HashMap>(); List listWorks = new ArrayList(); List> listTasks = new ArrayList>(); MapredWork currPlan = (MapredWork) currTask.getWork(); @@ -272,13 +272,13 @@ public static void processSkewJoin(JoinOperator joinOp, ArrayList aliases = new ArrayList(); String alias = src.toString(); aliases.add(alias); - String bigKeyDirPath = bigKeysDirMap.get(src); - newPlan.getPathToAliases().put(bigKeyDirPath, aliases); + Path bigKeyDirPath = bigKeysDirMap.get(src); + newPlan.getPathToAliases().put(bigKeyDirPath.toString(), aliases); newPlan.getAliasToWork().put(alias, tblScan_op); PartitionDesc part = new PartitionDesc(tableDescList.get(src), null); - newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part); + newPlan.getPathToPartitionInfo().put(bigKeyDirPath.toString(), part); newPlan.getAliasToPartnInfo().put(alias, part); Operator reducer = clonePlan.getReduceWork().getReducer(); @@ -297,7 +297,7 @@ public static void processSkewJoin(JoinOperator joinOp, MapredLocalWork localPlan = new MapredLocalWork( new LinkedHashMap>(), new LinkedHashMap()); - Map smallTblDirs = smallKeysDirMap.get(src); + Map smallTblDirs = smallKeysDirMap.get(src); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -306,7 +306,7 @@ public static void processSkewJoin(JoinOperator joinOp, Byte small_alias = tags[j]; Operator tblScan_op2 = parentOps[j]; localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2); - Path tblDir = new Path(smallTblDirs.get(small_alias)); + Path tblDir = smallTblDirs.get(small_alias); localPlan.getAliasToFetchWork().put(small_alias.toString(), new FetchWork(tblDir, tableDescList.get(small_alias))); } @@ -393,20 +393,20 @@ public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) { private static String SMALLKEYS = "smallkeys"; private static String RESULTS = "results"; - static String getBigKeysDir(String baseDir, Byte srcTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + BIGKEYS - + UNDERLINE + srcTbl; + private static Path getBigKeysDir(Path baseDir, Byte srcTbl) { + return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + srcTbl); } - static String getBigKeysSkewJoinResultDir(String baseDir, Byte srcTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + BIGKEYS - + UNDERLINE + RESULTS + UNDERLINE + srcTbl; + private static Path getBigKeysSkewJoinResultDir(Path baseDir, Byte srcTbl) { + return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + RESULTS + UNDERLINE + srcTbl); } - static String getSmallKeysDir(String baseDir, Byte srcTblBigTbl, + private static Path getSmallKeysDir(Path baseDir, Byte srcTblBigTbl, Byte srcTblSmallTbl) { - return baseDir + Path.SEPARATOR + skewJoinPrefix + UNDERLINE + SMALLKEYS - + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl; + return new Path(baseDir, skewJoinPrefix + UNDERLINE + SMALLKEYS + + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java index 010ac54..b9f345a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Stack; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -101,9 +102,9 @@ private void processCurrentTask(Task currTask, if (localwork != null) { // get the context info and set up the shared tmp URI Context ctx = physicalContext.getContext(); - String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId()); + String tmpFileURI = Utilities.generateTmpURI(new Path(ctx.getLocalTmpFileURI()), currTask.getId()).toString(); localwork.setTmpFileURI(tmpFileURI); - String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId()); + Path hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId()); mapredWork.getMapWork().setTmpHDFSFileURI(hdfsTmpURI); // create a task for this local work; right now, this local work is shared // by the original MapredTask and this new generated MapredLocalTask. @@ -166,15 +167,15 @@ private void processCurrentTask(Task currTask, // get bigKeysDirToTaskMap ConditionalResolverSkewJoinCtx context = (ConditionalResolverSkewJoinCtx) conditionalTask .getResolverCtx(); - HashMap> bigKeysDirToTaskMap = context + HashMap> bigKeysDirToTaskMap = context .getDirToTaskMap(); // to avoid concurrent modify the hashmap - HashMap> newbigKeysDirToTaskMap = new HashMap>(); + HashMap> newbigKeysDirToTaskMap = new HashMap>(); // reset the resolver - for (Map.Entry> entry : bigKeysDirToTaskMap + for (Map.Entry> entry : bigKeysDirToTaskMap .entrySet()) { Task task = entry.getValue(); - String key = entry.getKey(); + Path key = entry.getKey(); if (task.equals(currTask)) { newbigKeysDirToTaskMap.put(key, localTask); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index da13abf..a985c4f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -212,7 +212,7 @@ private void pushOperatorsAboveUnion(UnionOperator union, // for each sub-query. Also, these different filesinks need to be linked to each other FileSinkOperator fileSinkOp = (FileSinkOperator)stack.get(pos); // For file sink operator, change the directory name - String parentDirName = fileSinkOp.getConf().getDirName(); + Path parentDirName = fileSinkOp.getConf().getDirName(); // Clone the fileSinkDesc of the final fileSink and create similar fileSinks at // each parent @@ -220,9 +220,7 @@ private void pushOperatorsAboveUnion(UnionOperator union, for (Operator parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); - - String dirName = parentDirName + Path.SEPARATOR + parent.getIdentifier() ; - fileSinkDesc.setDirName(dirName); + fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); fileSinkDesc.setLinkedFileSink(true); fileSinkDesc.setParentDir(parentDirName); parent.setChildOperators(null); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index c836612..5697748 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -840,9 +840,9 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { TableDesc tblDesc = Utilities.getTableDesc(table); // Write the output to temporary directory and move it to the final location at the end // so the operation is atomic. - String queryTmpdir = ctx.getExternalTmpFileURI(newTblPartLoc.toUri()); - truncateTblDesc.setOutputDir(new Path(queryTmpdir)); - LoadTableDesc ltd = new LoadTableDesc(new Path(queryTmpdir), tblDesc, + Path queryTmpdir = ctx.getExternalTmpFileURI(newTblPartLoc.toUri()); + truncateTblDesc.setOutputDir(queryTmpdir); + LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), @@ -1455,9 +1455,9 @@ private void analyzeAlterTablePartMergeFiles(ASTNode tablePartAST, ASTNode ast, ddlWork.setNeedLock(true); Task mergeTask = TaskFactory.get(ddlWork, conf); TableDesc tblDesc = Utilities.getTableDesc(tblObj); - String queryTmpdir = ctx.getExternalTmpFileURI(newTblPartLoc.toUri()); - mergeDesc.setOutputDir(new Path(queryTmpdir)); - LoadTableDesc ltd = new LoadTableDesc(new Path(queryTmpdir), tblDesc, + Path queryTmpdir = ctx.getExternalTmpFileURI(newTblPartLoc.toUri()); + mergeDesc.setOutputDir(queryTmpdir); + LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 9914b1d..09e90e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -281,7 +281,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { private Task loadTable(URI fromURI, Table table) { Path dataPath = new Path(fromURI.toString(), "data"); - Path tmpPath = new Path(ctx.getExternalTmpFileURI(fromURI)); + Path tmpPath = ctx.getExternalTmpFileURI(fromURI); Task copyTask = TaskFactory.get(new CopyWork(dataPath, tmpPath, false), conf); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, @@ -325,7 +325,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(addPartitionDesc.getPartSpec()) + " with source location: " + srcLocation); - Path tmpPath = new Path(ctx.getExternalTmpFileURI(fromURI)); + Path tmpPath = ctx.getExternalTmpFileURI(fromURI); Task copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), tmpPath, false), conf); Task addPartTask = TaskFactory.get(new DDLWork(getInputs(), diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 344bb42..b77efd8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -226,15 +226,14 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // might seem redundant in the case // that the hive warehouse is also located in the local file system - but // that's just a test case. - String copyURIStr = ctx.getExternalTmpFileURI(toURI); - URI copyURI = URI.create(copyURIStr); + URI copyURI = ctx.getExternalTmpFileURI(toURI).toUri(); rTask = TaskFactory.get(new CopyWork(new Path(fromURI), new Path(copyURI)), conf); fromURI = copyURI; } // create final load/move work - String loadTmpPath = ctx.getExternalTmpFileURI(toURI); + Path loadTmpPath = ctx.getExternalTmpFileURI(toURI); Map partSpec = ts.getPartSpec(); if (partSpec == null) { partSpec = new LinkedHashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 848e0ab..4e40278 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1264,7 +1264,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException } try { fname = ctx.getExternalTmpFileURI( - FileUtils.makeQualified(location, conf).toUri()); + FileUtils.makeQualified(location, conf).toUri()).toString(); } catch (Exception e) { throw new SemanticException(generateErrorMessage(ast, "Error creating temporary folder on: " + location.toString()), e); @@ -1278,7 +1278,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException } } else { qb.setIsQuery(true); - fname = ctx.getMRTmpFileURI(); + fname = ctx.getMRTmpFileURI().toString(); ctx.setResDir(new Path(fname)); } } @@ -5264,7 +5264,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) Table dest_tab = null; // destination table if any Partition dest_part = null;// destination partition if any - String queryTmpdir = null; // the intermediate destination directory + Path queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory TableDesc table_desc = null; int currentTableId = 0; @@ -5330,7 +5330,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean isNonNativeTable = dest_tab.isNonNative(); if (isNonNativeTable) { - queryTmpdir = dest_path.toUri().getPath(); + queryTmpdir = dest_path; } else { queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); } @@ -5355,7 +5355,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) // Create the work for moving the table // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { - ltd = new LoadTableDesc(new Path(queryTmpdir),table_desc, dpCtx); + ltd = new LoadTableDesc(queryTmpdir,table_desc, dpCtx); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); ltd.setLbCtx(lbCtx); @@ -5438,7 +5438,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(), dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(), dest_part.isStoredAsSubDirectories(), conf); - ltd = new LoadTableDesc(new Path(queryTmpdir), table_desc, dest_part.getSpec()); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec()); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); ltd.setLbCtx(lbCtx); @@ -5558,7 +5558,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) } boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); - loadFileWork.add(new LoadFileDesc(tblDesc, new Path(queryTmpdir), dest_path, isDfsDir, cols, + loadFileWork.add(new LoadFileDesc(tblDesc, queryTmpdir, dest_path, isDfsDir, cols, colTypes)); if (tblDesc == null) { @@ -5638,7 +5638,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) // the same as directory name. The directory name // can be changed in the optimizer but the key should not be changed // it should be the same as the MoveWork's sourceDir. - fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName()); + fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); if (dest_part != null) { try { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 1fe5eeb..5fe21ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -51,7 +51,7 @@ public ConditionalResolverMergeFiles() { public static class ConditionalResolverMergeFilesCtx implements Serializable { private static final long serialVersionUID = 1L; List> listTasks; - private String dir; + private Path dir; private DynamicPartitionCtx dpCtx; // merge task could be after dynamic partition insert private ListBucketingCtx lbCtx; @@ -62,7 +62,7 @@ public ConditionalResolverMergeFilesCtx() { * @param dir */ public ConditionalResolverMergeFilesCtx( - List> listTasks, String dir) { + List> listTasks, Path dir) { this.listTasks = listTasks; this.dir = dir; } @@ -70,7 +70,7 @@ public ConditionalResolverMergeFilesCtx( /** * @return the dir */ - public String getDir() { + public Path getDir() { return dir; } @@ -78,7 +78,7 @@ public String getDir() { * @param dir * the dir to set */ - public void setDir(String dir) { + public void setDir(Path dir) { this.dir = dir; } @@ -123,7 +123,7 @@ public void setLbCtx(ListBucketingCtx lbCtx) { public List> getTasks(HiveConf conf, Object objCtx) { ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx) objCtx; - String dirName = ctx.getDir(); + Path dirName = ctx.getDir(); List> resTsks = new ArrayList>(); // check if a map-reduce job is needed to merge the files @@ -138,11 +138,10 @@ public void setLbCtx(ListBucketingCtx lbCtx) { Task mrAndMvTask = ctx.getListTasks().get(2); try { - Path dirPath = new Path(dirName); - FileSystem inpFs = dirPath.getFileSystem(conf); + FileSystem inpFs = dirName.getFileSystem(conf); DynamicPartitionCtx dpCtx = ctx.getDPCtx(); - if (inpFs.exists(dirPath)) { + if (inpFs.exists(dirName)) { // For each dynamic partition, check if it needs to be merged. MapWork work; if (mrTask.getWork() instanceof MapredWork) { @@ -169,11 +168,11 @@ public void setLbCtx(ListBucketingCtx lbCtx) { int dpLbLevel = numDPCols + lbLevel; generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, mrTask, - mrAndMvTask, dirPath, inpFs, ctx, work, dpLbLevel); + mrAndMvTask, dirName, inpFs, ctx, work, dpLbLevel); } else { // no dynamic partitions if(lbLevel == 0) { // static partition without list bucketing - long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize); + long totalSz = getMergeSize(inpFs, dirName, avgConditionSize); if (totalSz >= 0) { // add the merge job setupMapRedWork(conf, work, trgtSize, totalSz); resTsks.add(mrTask); @@ -183,7 +182,7 @@ public void setLbCtx(ListBucketingCtx lbCtx) { } else { // static partition and list bucketing generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, mrTask, - mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel); + mrAndMvTask, dirName, inpFs, ctx, work, lbLevel); } } } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java index 184941f..eed9021 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java @@ -50,7 +50,7 @@ // tables into corresponding different dirs (one dir per table). // this map stores mapping from "big key dir" to its corresponding mapjoin // task. - private HashMap> dirToTaskMap; + private HashMap> dirToTaskMap; private Task noSkewTask; /** @@ -60,19 +60,19 @@ public ConditionalResolverSkewJoinCtx() { } public ConditionalResolverSkewJoinCtx( - HashMap> dirToTaskMap, + HashMap> dirToTaskMap, Task noSkewTask) { super(); this.dirToTaskMap = dirToTaskMap; this.noSkewTask = noSkewTask; } - public HashMap> getDirToTaskMap() { + public HashMap> getDirToTaskMap() { return dirToTaskMap; } public void setDirToTaskMap( - HashMap> dirToTaskMap) { + HashMap> dirToTaskMap) { this.dirToTaskMap = dirToTaskMap; } @@ -94,16 +94,15 @@ public ConditionalResolverSkewJoin() { ConditionalResolverSkewJoinCtx ctx = (ConditionalResolverSkewJoinCtx) objCtx; List> resTsks = new ArrayList>(); - Map> dirToTaskMap = ctx + Map> dirToTaskMap = ctx .getDirToTaskMap(); - Iterator>> bigKeysPathsIter = dirToTaskMap + Iterator>> bigKeysPathsIter = dirToTaskMap .entrySet().iterator(); try { while (bigKeysPathsIter.hasNext()) { - Entry> entry = bigKeysPathsIter + Entry> entry = bigKeysPathsIter .next(); - String path = entry.getKey(); - Path dirPath = new Path(path); + Path dirPath = entry.getKey(); FileSystem inpFs = dirPath.getFileSystem(conf); FileStatus[] fstatus = Utilities.listStatusIfExists(dirPath, inpFs); if (fstatus != null && fstatus.length > 0) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 29cfe9d..24db7d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.Table; @@ -39,7 +40,7 @@ private int numDPCols; // number of dynamic partition columns private int numSPCols; // number of static partition columns private String spPath; // path name corresponding to SP columns - private String rootPath; // the root path DP columns paths start from + private Path rootPath; // the root path DP columns paths start from private int numBuckets; // number of buckets in each partition private Map inputToDPCols; // mapping from input column names to DP columns @@ -128,11 +129,11 @@ public int getNumBuckets() { return this.numBuckets; } - public void setRootPath(String root) { + public void setRootPath(Path root) { this.rootPath = root; } - public String getRootPath() { + public Path getRootPath() { return this.rootPath; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 30b2411..747ac85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -30,7 +30,7 @@ @Explain(displayName = "File Output Operator") public class FileSinkDesc extends AbstractOperatorDesc { private static final long serialVersionUID = 1L; - private String dirName; + private Path dirName; // normally statsKeyPref will be the same as dirName, but the latter // could be changed in local execution optimization private String statsKeyPref; @@ -69,7 +69,7 @@ // the sub-queries write to sub-directories of a common directory. So, the file sink // descriptors for subq1 and subq2 are linked. private boolean linkedFileSink = false; - private String parentDir; + private Path parentDir; transient private List linkedFileSinkDesc; private boolean statsReliable; @@ -81,7 +81,7 @@ public FileSinkDesc() { } - public FileSinkDesc(final String dirName, final TableDesc tableInfo, + public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx) { @@ -98,7 +98,7 @@ public FileSinkDesc(final String dirName, final TableDesc tableInfo, this.dpCtx = dpCtx; } - public FileSinkDesc(final String dirName, final TableDesc tableInfo, + public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed) { this.dirName = dirName; @@ -132,15 +132,15 @@ public Object clone() throws CloneNotSupportedException { } @Explain(displayName = "directory", normalExplain = false) - public String getDirName() { + public Path getDirName() { return dirName; } - public void setDirName(final String dirName) { + public void setDirName(final Path dirName) { this.dirName = dirName; } - public String getFinalDirName() { + public Path getFinalDirName() { return linkedFileSink ? parentDir : dirName; } @@ -320,11 +320,11 @@ public void setLinkedFileSink(boolean linkedFileSink) { this.linkedFileSink = linkedFileSink; } - public String getParentDir() { + public Path getParentDir() { return parentDir; } - public void setParentDir(String parentDir) { + public void setParentDir(Path parentDir) { this.parentDir = parentDir; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java index 14fced7..4729d61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java @@ -29,6 +29,8 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.fs.Path; + /** * Map Join operator Descriptor implementation. * @@ -41,8 +43,8 @@ // used to handle skew join private boolean handleSkewJoin = false; private int skewKeyDefinition = -1; - private Map bigKeysDirMap; - private Map> smallKeysDirMap; + private Map bigKeysDirMap; + private Map> smallKeysDirMap; private Map skewKeysValuesTables; // alias to key mapping @@ -173,22 +175,22 @@ public void setSkewKeyDefinition(int skewKeyDefinition) { } @Override - public Map getBigKeysDirMap() { + public Map getBigKeysDirMap() { return bigKeysDirMap; } @Override - public void setBigKeysDirMap(Map bigKeysDirMap) { + public void setBigKeysDirMap(Map bigKeysDirMap) { this.bigKeysDirMap = bigKeysDirMap; } @Override - public Map> getSmallKeysDirMap() { + public Map> getSmallKeysDirMap() { return smallKeysDirMap; } @Override - public void setSmallKeysDirMap(Map> smallKeysDirMap) { + public void setSmallKeysDirMap(Map> smallKeysDirMap) { this.smallKeysDirMap = smallKeysDirMap; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index 4b9feac..2168811 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; + /** * Join operator Descriptor implementation. @@ -44,8 +46,8 @@ // used to handle skew join private boolean handleSkewJoin = false; private int skewKeyDefinition = -1; - private Map bigKeysDirMap; - private Map> smallKeysDirMap; + private Map bigKeysDirMap; + private Map> smallKeysDirMap; private Map skewKeysValuesTables; // alias to key mapping @@ -128,12 +130,12 @@ public Object clone() { } if (getBigKeysDirMap() != null) { - Map cloneBigKeysDirMap = new HashMap(); + Map cloneBigKeysDirMap = new HashMap(); cloneBigKeysDirMap.putAll(getBigKeysDirMap()); ret.setBigKeysDirMap(cloneBigKeysDirMap); } if (getSmallKeysDirMap() != null) { - Map> cloneSmallKeysDirMap = new HashMap> (); + Map> cloneSmallKeysDirMap = new HashMap> (); cloneSmallKeysDirMap.putAll(getSmallKeysDirMap()); ret.setSmallKeysDirMap(cloneSmallKeysDirMap); } @@ -364,7 +366,7 @@ public void setHandleSkewJoin(boolean handleSkewJoin) { /** * @return mapping from tbl to dir for big keys. */ - public Map getBigKeysDirMap() { + public Map getBigKeysDirMap() { return bigKeysDirMap; } @@ -373,14 +375,14 @@ public void setHandleSkewJoin(boolean handleSkewJoin) { * * @param bigKeysDirMap */ - public void setBigKeysDirMap(Map bigKeysDirMap) { + public void setBigKeysDirMap(Map bigKeysDirMap) { this.bigKeysDirMap = bigKeysDirMap; } /** * @return mapping from tbl to dir for small keys */ - public Map> getSmallKeysDirMap() { + public Map> getSmallKeysDirMap() { return smallKeysDirMap; } @@ -389,7 +391,7 @@ public void setBigKeysDirMap(Map bigKeysDirMap) { * * @param smallKeysDirMap */ - public void setSmallKeysDirMap(Map> smallKeysDirMap) { + public void setSmallKeysDirMap(Map> smallKeysDirMap) { this.smallKeysDirMap = smallKeysDirMap; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 9929275..de0bc86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -77,17 +77,17 @@ // If this map task has a FileSinkOperator, and bucketing/sorting metadata can be // inferred about the data being written by that operator, these are mappings from the directory // that operator writes into to the bucket/sort columns for that data. - private final Map> bucketedColsByDirectory = - new HashMap>(); - private final Map> sortedColsByDirectory = - new HashMap>(); + private final Map> bucketedColsByDirectory = + new HashMap>(); + private final Map> sortedColsByDirectory = + new HashMap>(); private MapredLocalWork mapLocalWork; - private String tmpHDFSFileURI; + private Path tmpHDFSFileURI; private String inputformat; - private String indexIntermediateFile; + private Path indexIntermediateFile; private Integer numMapTasks; private Long maxSplitSize; @@ -402,7 +402,7 @@ public void setHadoopSupportsSplittable(boolean hadoopSupportsSplittable) { this.hadoopSupportsSplittable = hadoopSupportsSplittable; } - public String getIndexIntermediateFile() { + public Path getIndexIntermediateFile() { return indexIntermediateFile; } @@ -432,11 +432,11 @@ public void setOpParseCtxMap( this.opParseCtxMap = opParseCtxMap; } - public String getTmpHDFSFileURI() { + public Path getTmpHDFSFileURI() { return tmpHDFSFileURI; } - public void setTmpHDFSFileURI(String tmpHDFSFileURI) { + public void setTmpHDFSFileURI(Path tmpHDFSFileURI) { this.tmpHDFSFileURI = tmpHDFSFileURI; } @@ -446,20 +446,20 @@ public void mergingInto(MapWork mapWork) { } @Explain(displayName = "Path -> Bucketed Columns", normalExplain = false) - public Map> getBucketedColsByDirectory() { + public Map> getBucketedColsByDirectory() { return bucketedColsByDirectory; } @Explain(displayName = "Path -> Sorted Columns", normalExplain = false) - public Map> getSortedColsByDirectory() { + public Map> getSortedColsByDirectory() { return sortedColsByDirectory; } - public void addIndexIntermediateFile(String fileName) { + public void addIndexIntermediateFile(Path fileName) { if (this.indexIntermediateFile == null) { this.indexIntermediateFile = fileName; } else { - this.indexIntermediateFile += "," + fileName; + this.indexIntermediateFile = new Path(indexIntermediateFile, "," + fileName); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index d4ad931..5991aae 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -196,8 +196,8 @@ private FilterDesc getTestFilterDesc(String column) { @SuppressWarnings("unchecked") private void populateMapPlan1(Table src) { - Operator op2 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapplan1.out", Utilities.defaultTd, true)); + Operator op2 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapplan1.out"), Utilities.defaultTd, true)); Operator op1 = OperatorFactory.get(getTestFilterDesc("key"), op2); @@ -207,8 +207,8 @@ private void populateMapPlan1(Table src) { @SuppressWarnings("unchecked") private void populateMapPlan2(Table src) { - Operator op3 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapplan2.out", Utilities.defaultTd, false)); + Operator op3 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapplan2.out"), Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(new ScriptDesc("cat", PlanUtils.getDefaultTableDesc("" + Utilities.tabCode, "key,value"), @@ -243,8 +243,8 @@ private void populateMapRedPlan1(Table src) throws SemanticException { mr.setReduceWork(rWork); // reduce side work - Operator op3 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan1.out", Utilities.defaultTd, false)); + Operator op3 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan1.out"), Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(new ExtractDesc( getStringColumn(Utilities.ReduceField.VALUE.toString())), op3); @@ -273,8 +273,8 @@ private void populateMapRedPlan2(Table src) throws SemanticException { mr.setReduceWork(rWork); // reduce side work - Operator op4 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan2.out", Utilities.defaultTd, false)); + Operator op4 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan2.out"), Utilities.defaultTd, false)); Operator op3 = OperatorFactory.get(getTestFilterDesc("0"), op4); @@ -317,8 +317,8 @@ private void populateMapRedPlan3(Table src, Table src2) throws SemanticException rWork.getTagToValueDesc().add(op2.getConf().getValueSerializeInfo()); // reduce side work - Operator op4 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan3.out", Utilities.defaultTd, false)); + Operator op4 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan3.out"), Utilities.defaultTd, false)); Operator op5 = OperatorFactory.get(new SelectDesc(Utilities .makeList(new ExprNodeFieldDesc(TypeInfoFactory.stringTypeInfo, @@ -360,8 +360,8 @@ private void populateMapRedPlan4(Table src) throws SemanticException { mr.setReduceWork(rWork); // reduce side work - Operator op3 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan4.out", Utilities.defaultTd, false)); + Operator op3 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan4.out"), Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(new ExtractDesc( getStringColumn(Utilities.ReduceField.VALUE.toString())), op3); @@ -399,8 +399,8 @@ private void populateMapRedPlan5(Table src) throws SemanticException { rWork.getTagToValueDesc().add(op0.getConf().getValueSerializeInfo()); // reduce side work - Operator op3 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan5.out", Utilities.defaultTd, false)); + Operator op3 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan5.out"), Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(new ExtractDesc( getStringColumn(Utilities.ReduceField.VALUE.toString())), op3); @@ -440,8 +440,8 @@ private void populateMapRedPlan6(Table src) throws SemanticException { rWork.getTagToValueDesc().add(op1.getConf().getValueSerializeInfo()); // reduce side work - Operator op3 = OperatorFactory.get(new FileSinkDesc(tmpdir + File.separator - + "mapredplan6.out", Utilities.defaultTd, false)); + Operator op3 = OperatorFactory.get(new FileSinkDesc(new Path(tmpdir + File.separator + + "mapredplan6.out"), Utilities.defaultTd, false)); Operator op2 = OperatorFactory.get(getTestFilterDesc("0"), op3); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java index f1e8555..1364888 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java @@ -25,6 +25,7 @@ import junit.framework.TestCase; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -89,8 +90,8 @@ public void testPlan() throws Exception { // store into configuration job.set("fs.default.name", "file:///"); - Utilities.setMapRedWork(job, mrwork, System.getProperty("java.io.tmpdir") + File.separator + - System.getProperty("user.name") + File.separator + "hive"); + Utilities.setMapRedWork(job, mrwork, new Path(System.getProperty("java.io.tmpdir") + File.separator + + System.getProperty("user.name") + File.separator + "hive")); MapredWork mrwork2 = Utilities.getMapRedWork(job); Utilities.clearWork(job); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java index 1047a8d..728a9dd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java @@ -140,7 +140,7 @@ private void init() throws IOException { pt.put("/tmp/testfolder", partDesc); MapredWork mrwork = new MapredWork(); mrwork.getMapWork().setPathToPartitionInfo(pt); - Utilities.setMapRedWork(conf, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive"); + Utilities.setMapRedWork(conf, mrwork, new Path("/tmp/" + System.getProperty("user.name") + "/hive")); hiveSplit = new TestHiveInputSplit(); hbsReader = new TestHiveRecordReader(rcfReader, conf); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java index 94061af..acf011c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java @@ -81,7 +81,7 @@ protected void setUp() throws IOException { pt.put("/tmp/testfolder", partDesc); MapredWork mrwork = new MapredWork(); mrwork.getMapWork().setPathToPartitionInfo(pt); - Utilities.setMapRedWork(job, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive"); + Utilities.setMapRedWork(job, mrwork, new Path("/tmp/" + System.getProperty("user.name") + "/hive")); fileSystem = FileSystem.getLocal(conf); testDir = new Path(System.getProperty("test.tmp.dir", System.getProperty( @@ -169,18 +169,16 @@ public void testCombine() throws Exception { drv.compile(cmd); //create scratch dir - String emptyScratchDirStr; Path emptyScratchDir; Context ctx = new Context(newJob); - emptyScratchDirStr = ctx.getMRTmpFileURI(); - emptyScratchDir = new Path(emptyScratchDirStr); + emptyScratchDir = ctx.getMRTmpFileURI(); FileSystem fileSys = emptyScratchDir.getFileSystem(newJob); fileSys.mkdirs(emptyScratchDir); QueryPlan plan = drv.getPlan(); MapRedTask selectTask = (MapRedTask)plan.getRootTasks().get(0); - List inputPaths = Utilities.getInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir.toString(), ctx); + List inputPaths = Utilities.getInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir, ctx); Utilities.setInputPaths(newJob, inputPaths); Utilities.setMapRedWork(newJob, selectTask.getWork(), ctx.getMRTmpFileURI());