Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1023029) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -39,8 +39,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -905,44 +907,30 @@ * processes might move forward with partial data */ - FileSystem fs; - Path partPath; - - // check if partition exists without creating it - Partition part = getPartition(tbl, partSpec, false); - if (part == null) { - // Partition does not exist currently. The partition name is - // extrapolated from - // the table's location (even if the table is marked external) - fs = FileSystem.get(tbl.getDataLocation(), getConf()); - partPath = new Path(tbl.getDataLocation().getPath(), - Warehouse.makePartName(partSpec)); - } else { - // Partition exists already. Get the path from the partition. This will - // get the default path for Hive created partitions or the external path - // when directly created by user - partPath = part.getPath()[0]; - fs = partPath.getFileSystem(getConf()); + Partition oldPart = getPartition(tbl, partSpec, false, null); + Path oldPartPath = null; + if(oldPart != null) { + oldPartPath = oldPart.getPartitionPath(); } + Path newPartPath = new Path(loadPath.toUri().getScheme(), loadPath + .toUri().getAuthority(), oldPartPath.toUri().getPath()); + if (replace) { - Hive.replaceFiles(loadPath, partPath, fs, tmpDirPath); + Hive.replaceFiles(loadPath, newPartPath, oldPartPath, tmpDirPath, getConf()); } else { - Hive.copyFiles(loadPath, partPath, fs); + FileSystem fs = FileSystem.get(tbl.getDataLocation(), getConf()); + Hive.copyFiles(loadPath, newPartPath, fs); } // recreate the partition if it existed before if (!holdDDLTime) { - part = getPartition(tbl, partSpec, true); + getPartition(tbl, partSpec, true, newPartPath.toString()); } } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); - } catch (MetaException e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException(e); } - } /** @@ -1090,6 +1078,11 @@ return new Partition(tbl, partition); } + + public Partition getPartition(Table tbl, Map partSpec, + boolean forceCreate) throws HiveException { + return getPartition(tbl, partSpec, forceCreate, null); + } /** * Returns partition metadata @@ -1105,7 +1098,7 @@ * @throws HiveException */ public Partition getPartition(Table tbl, Map partSpec, - boolean forceCreate) throws HiveException { + boolean forceCreate, String partPath) throws HiveException { if (!tbl.isValidSpec(partSpec)) { throw new HiveException("Invalid partition: " + partSpec); } @@ -1147,6 +1140,10 @@ tpart.getSd().setOutputFormat(tbl.getTTable().getSd().getOutputFormat()); tpart.getSd().setInputFormat(tbl.getTTable().getSd().getInputFormat()); tpart.getSd().getSerdeInfo().setSerializationLib(tbl.getSerializationLib()); + if (partPath == null || partPath.trim().equals("")) { + throw new HiveException("new partition path should not be null or empty."); + } + tpart.getSd().setLocation(partPath); alterPartition(tbl.getTableName(), new Partition(tbl, tpart)); } } @@ -1443,19 +1440,31 @@ /** * Replaces files in the partition with new data set specifed by srcf. Works - * by moving files + * by moving files. + * srcf, destf, and tmppath should resident in the same dfs, but the oldPath can be in a + * different dfs. * * @param srcf * Files to be moved. Leaf Directories or Globbed File Paths * @param destf * The directory where the final data needs to go - * @param fs - * The filesystem handle + * @param oldPath + * The directory where the old data location, need to be cleaned up. * @param tmppath * Temporary directory */ - static protected void replaceFiles(Path srcf, Path destf, FileSystem fs, - Path tmppath) throws HiveException { + static protected void replaceFiles(Path srcf, Path destf, Path oldPath, + Path tmppath, Configuration conf) throws HiveException { + + FileSystem fs = null; + FsShell fshell = new FsShell(); + fshell.setConf(conf); + try { + fs = FileSystem.get(srcf.toUri(), conf); + } catch (IOException e1) { + throw new HiveException(e1.getMessage(), e1); + } + FileStatus[] srcs; try { srcs = fs.listStatus(srcf); @@ -1483,8 +1492,17 @@ } // point of no return - boolean b = fs.delete(destf, true); - LOG.debug("Deleting:" + destf.toString() + ",Status:" + b); + if (oldPath != null) { + try { + fshell.run(new String[]{"-rmr", oldPath.toUri().toString()}); + } catch (Exception e) { + //swallow the exception + } + } + try { + fshell.run(new String[]{"-rmr", destf.toUri().toString()}); + } catch (Exception e) { + } // create the parent directory otherwise rename can fail if the parent // doesn't exist @@ -1493,13 +1511,12 @@ + destf.getParent().toString()); } - b = fs.rename(tmppath, destf); + boolean b = fs.rename(tmppath, destf); if (!b) { throw new HiveException("Unable to move results from " + tmppath + " to destination directory: " + destf.getParent().toString()); } LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b); - } catch (IOException e) { throw new HiveException("replaceFiles: error while moving files from " + tmppath + " to " + destf + "!!!", e); Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 1022214) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy) @@ -524,13 +524,8 @@ * Temporary directory */ protected void replaceFiles(Path srcf, Path tmpd) throws HiveException { - FileSystem fs; - try { - fs = FileSystem.get(getDataLocation(), Hive.get().getConf()); - Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } + Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), null, tmpd, + Hive.get().getConf()); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1022214) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -3435,8 +3435,14 @@ dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); + Path tabPath = dest_tab.getPath(); + Path partPath = dest_part.getPartitionPath(); + + // if the table is in a different dfs than the partition, + // replace the partition's dfs with the table's dfs. + dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() + .getAuthority(), partPath.toUri().getPath()); - dest_path = dest_part.getPath()[0]; if ("har".equalsIgnoreCase(dest_path.toUri().getScheme())) { throw new SemanticException(ErrorMsg.OVERWRITE_ARCHIVED_PART .getMsg());