diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 8beef09..699c7bf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -103,7 +103,7 @@ protected void setUp() { db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false); + db.loadTable(hadoopDataFile[i], src, false, false, false); i++; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ed7787d..37c8570 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -249,13 +249,13 @@ public int execute(DriverContext driverContext) { // Get all files from the src directory FileStatus[] dirs; ArrayList files; - FileSystem fs; + FileSystem srcFs; // source filesystem try { - fs = table.getDataLocation().getFileSystem(conf); - dirs = fs.globStatus(tbd.getSourcePath()); + srcFs = tbd.getSourcePath().getFileSystem(conf); + dirs = srcFs.globStatus(tbd.getSourcePath()); files = new ArrayList(); for (int i = 0; (dirs != null && i < dirs.length); i++) { - files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath()))); + files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath()))); // We only check one file, so exit the loop when we have at least // one. if (files.size() > 0) { @@ -269,7 +269,7 @@ public int execute(DriverContext driverContext) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { // Check if the file format of the file matches that of the table. boolean flag = HiveFileFormatUtils.checkInputFormat( - fs, conf, tbd.getTable().getInputFileFormatClass(), files); + srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); if (!flag) { throw new HiveException( "Wrong file format. Please check the file's format."); @@ -282,7 +282,7 @@ public int execute(DriverContext driverContext) { if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); db.loadTable(tbd.getSourcePath(), tbd.getTable() - .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime()); + .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime(), work.isSrcLocal()); if (work.getOutputs() != null) { work.getOutputs().add(new WriteEntity(table)); } @@ -407,11 +407,13 @@ public int execute(DriverContext driverContext) { db.validatePartitionNameCharacters(partVals); db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(), - tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd)); - Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); + tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal()); + Partition partn = db.getPartition(table, tbd.getPartitionSpec(), + false); - if (bucketCols != null || sortCols != null) { - updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols); + if (bucketCols != null || sortCols != null) { + updatePartitionBucketSortColumns(table, partn, bucketCols, + numBuckets, sortCols); } dc = new DataContainer(table.getTTable(), partn.getTPartition()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e59decc..ee10842 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1180,11 +1180,13 @@ public Database getDatabaseCurrent() throws HiveException { * @param holdDDLTime if true, force [re]create the partition * @param inheritTableSpecs if true, on [re]creating the partition, take the * location/inputformat/outputformat/serde details from table spec + * @param isSrcLocal + * If the source directory is LOCAL */ public void loadPartition(Path loadPath, String tableName, Map partSpec, boolean replace, boolean holdDDLTime, - boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir) - throws HiveException { + boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal) throws HiveException { Table tbl = getTable(tableName); try { /** @@ -1227,10 +1229,11 @@ public void loadPartition(Path loadPath, String tableName, } if (replace) { - Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf()); + Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf(), + isSrcLocal); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal); } // recreate the partition if it existed before @@ -1418,7 +1421,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // finally load the partition -- move the file to the final table address loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true, - listBucketingEnabled); + listBucketingEnabled, false); LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } return fullPartSpecs; @@ -1440,15 +1443,16 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param replace * if true - replace files in the table, otherwise add files to table * @param holdDDLTime + * @param isSrcLocal + * If the source directory is LOCAL */ public void loadTable(Path loadPath, String tableName, boolean replace, - boolean holdDDLTime) throws HiveException { + boolean holdDDLTime, boolean isSrcLocal) throws HiveException { Table tbl = getTable(tableName); - if (replace) { - tbl.replaceFiles(loadPath); + tbl.replaceFiles(loadPath, isSrcLocal); } else { - tbl.copyFiles(loadPath); + tbl.copyFiles(loadPath, isSrcLocal); } if (!holdDDLTime) { @@ -2070,9 +2074,9 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, } // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest - private static List> checkPaths(HiveConf conf, - FileSystem fs, FileStatus[] srcs, Path destf, - boolean replace) throws HiveException { + private static List> checkPaths(HiveConf conf, FileSystem fs, + FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace) + throws HiveException { List> result = new ArrayList>(); try { @@ -2084,7 +2088,7 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, for (FileStatus src : srcs) { FileStatus[] items; if (src.isDir()) { - items = fs.listStatus(src.getPath()); + items = srcFs.listStatus(src.getPath()); Arrays.sort(items); } else { items = new FileStatus[] {src}; @@ -2099,7 +2103,7 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, // This check is redundant because temp files are removed by // execution layer before // calling loadTable/Partition. But leaving it in just in case. - fs.delete(itemSource, true); + srcFs.delete(itemSource, true); continue; } @@ -2161,8 +2165,8 @@ private static boolean destExists(List> result, Path proposed) { //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission - static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, FileSystem fs, - boolean replace) throws HiveException { + static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, + FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException { boolean success = false; boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); @@ -2194,11 +2198,18 @@ static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, FileSy } } } - success = fs.rename(srcf, destf); + if (!isSrcLocal) { + // For NOT local src file, rename the file + success = fs.rename(srcf, destf); + } else { + // For local src file, copy to hdfs + fs.copyFromLocalFile(srcf, destf); + success = true; + } LOG.info((replace ? "Replacing src:" : "Renaming src:") + srcf.toString() + ";dest: " + destf.toString() + ";Status:" + success); } catch (IOException ioe) { - throw new HiveException("Unable to move source" + srcf + " to destination " + destf, ioe); + throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe); } if (success && inheritPerms) { @@ -2215,8 +2226,8 @@ static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, FileSy return success; } - static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs) - throws HiveException { + static protected void copyFiles(HiveConf conf, Path srcf, Path destf, + FileSystem fs, boolean isSrcLocal) throws HiveException { boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); try { @@ -2234,8 +2245,10 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem } FileStatus[] srcs; + FileSystem srcFs; try { - srcs = fs.globStatus(srcf); + srcFs = srcf.getFileSystem(conf); + srcs = srcFs.globStatus(srcf); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException("addFiles: filesystem error in check phase", e); @@ -2246,14 +2259,14 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem // srcs = new FileStatus[0]; Why is this needed? } // check that source and target paths exist - List> result = checkPaths(conf, fs, srcs, destf, false); - + List> result = checkPaths(conf, fs, srcs, srcFs, destf, false); // move it, move it try { for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!renameFile(conf, sdpair[0], sdpair[1], fs, false)) { - throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); + if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) { + throw new IOException("Cannot move " + sdpair[0] + " to " + + sdpair[1]); } } } @@ -2276,18 +2289,22 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem * The directory where the final data needs to go * @param oldPath * The directory where the old data location, need to be cleaned up. + * @param isSrcLocal + * If the source directory is LOCAL */ - static protected void replaceFiles(Path srcf, Path destf, Path oldPath, HiveConf conf) - throws HiveException { + static protected void replaceFiles(Path srcf, Path destf, Path oldPath, + HiveConf conf, boolean isSrcLocal) throws HiveException { try { - FileSystem fs = srcf.getFileSystem(conf); + FileSystem destFs = destf.getFileSystem(conf); boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); // check if srcf contains nested sub-directories FileStatus[] srcs; + FileSystem srcFs; try { - srcs = fs.globStatus(srcf); + srcFs = srcf.getFileSystem(conf); + srcs = destFs.globStatus(srcf); } catch (IOException e) { throw new HiveException("Getting globStatus " + srcf.toString(), e); } @@ -2295,7 +2312,8 @@ static protected void replaceFiles(Path srcf, Path destf, Path oldPath, HiveConf LOG.info("No sources specified to move: " + srcf); return; } - List> result = checkPaths(conf, fs, srcs, destf, true); + List> result = checkPaths(conf, destFs, srcs, srcFs, destf, + true); if (oldPath != null) { try { @@ -2316,35 +2334,37 @@ static protected void replaceFiles(Path srcf, Path destf, Path oldPath, HiveConf if (srcs.length == 1 && srcs[0].isDir()) { // rename can fail if the parent doesn't exist Path destfp = destf.getParent(); - if (!fs.exists(destfp)) { - boolean success = fs.mkdirs(destfp); + if (!destFs.exists(destfp)) { + boolean success = destFs.mkdirs(destfp); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { - fs.setPermission(destfp, fs.getFileStatus(destfp.getParent()).getPermission()); + destFs.setPermission(destfp, destFs.getFileStatus(destfp.getParent()).getPermission()); } } - boolean b = renameFile(conf, srcs[0].getPath(), destf, fs, true); + boolean b = renameFile(conf, srcs[0].getPath(), destf, destFs, true, + isSrcLocal); if (!b) { throw new HiveException("Unable to move results from " + srcs[0].getPath() + " to destination directory: " + destf); } } else { // srcf is a file or pattern containing wildcards - if (!fs.exists(destf)) { - boolean success = fs.mkdirs(destf); + if (!destFs.exists(destf)) { + boolean success = destFs.mkdirs(destf); if (!success) { LOG.warn("Error creating directory " + destf.toString()); } if (inheritPerms && success) { - fs.setPermission(destf, fs.getFileStatus(destf.getParent()).getPermission()); + destFs.setPermission(destf, destFs.getFileStatus(destf.getParent()).getPermission()); } } // srcs must be a list of files -- ensured by LoadSemanticAnalyzer for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!renameFile(conf, sdpair[0], sdpair[1], fs, true)) { + if (!renameFile(conf, sdpair[0], sdpair[1], destFs, true, + isSrcLocal)) { throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index d13d0b0..daf6cba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -649,10 +648,14 @@ public int getNumBuckets() { * * @param srcf * Source directory + * @param isSrcLocal + * If the source directory is LOCAL */ - protected void replaceFiles(Path srcf) throws HiveException { + protected void replaceFiles(Path srcf, boolean isSrcLocal) + throws HiveException { Path tableDest = getPath(); - Hive.replaceFiles(srcf, tableDest, tableDest, Hive.get().getConf()); + Hive.replaceFiles(srcf, tableDest, tableDest, Hive.get().getConf(), + isSrcLocal); } /** @@ -660,12 +663,14 @@ protected void replaceFiles(Path srcf) throws HiveException { * * @param srcf * Files to be moved. Leaf directories or globbed file paths + * @param isSrcLocal + * If the source directory is LOCAL */ - protected void copyFiles(Path srcf) throws HiveException { + protected void copyFiles(Path srcf, boolean isSrcLocal) throws HiveException { FileSystem fs; try { fs = getDataLocation().getFileSystem(Hive.get().getConf()); - Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs); + Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index a22a15f..adc596a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; @@ -220,18 +219,6 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Task rTask = null; - // create copy work - if (isLocal) { - // if the local keyword is specified - we will always make a copy. this - // might seem redundant in the case - // that the hive warehouse is also located in the local file system - but - // that's just a test case. - String copyURIStr = ctx.getExternalTmpPath(toURI).toString(); - URI copyURI = URI.create(copyURIStr); - rTask = TaskFactory.get(new CopyWork(new Path(fromURI), new Path(copyURI)), conf); - fromURI = copyURI; - } - // create final load/move work Map partSpec = ts.getPartSpec(); @@ -261,7 +248,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite); Task childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true), conf); + getOutputs(), loadTableWork, null, true, isLocal), conf); if (rTask != null) { rTask.addDependentTask(childTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 407450e..e43156f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -38,6 +38,7 @@ private LoadMultiFilesDesc loadMultiFilesWork; private boolean checkFileFormat; + private boolean srcLocal; /** * ReadEntitites that are passed to the hooks. @@ -63,6 +64,16 @@ public MoveWork(HashSet inputs, HashSet outputs) { public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, + boolean checkFileFormat, boolean srcLocal) { + this(inputs, outputs); + this.loadTableWork = loadTableWork; + this.loadFileWork = loadFileWork; + this.checkFileFormat = checkFileFormat; + this.srcLocal = srcLocal; + } + + public MoveWork(HashSet inputs, HashSet outputs, + final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat) { this(inputs, outputs); this.loadTableWork = loadTableWork; @@ -121,4 +132,12 @@ public void setOutputs(HashSet outputs) { this.outputs = outputs; } + public boolean isSrcLocal() { + return srcLocal; + } + + public void setSrcLocal(boolean srcLocal) { + this.srcLocal = srcLocal; + } + } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index 5991aae..91efb58 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -125,7 +125,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false); + db.loadTable(hadoopDataFile[i], src, false, false, true); i++; }