Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1022214)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -259,6 +259,8 @@
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+
+ HIVEPARTITIONUSETBLDFS("hive.exec.partition.usetable.dfs", false),
HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1022214)
+++ conf/hive-default.xml (working copy)
@@ -73,6 +73,13 @@
+ hive.exec.partition.usetable.dfs
+ false
+ when overwriting a partition's data, this config decides which dfs
+ (dfs in table metadata or the partition's old dfs) to put the new data.
+
+
+
hive.metastore.local
true
controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1023029)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -39,8 +39,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaException;
@@ -905,44 +907,30 @@
* processes might move forward with partial data
*/
- FileSystem fs;
- Path partPath;
-
- // check if partition exists without creating it
- Partition part = getPartition(tbl, partSpec, false);
- if (part == null) {
- // Partition does not exist currently. The partition name is
- // extrapolated from
- // the table's location (even if the table is marked external)
- fs = FileSystem.get(tbl.getDataLocation(), getConf());
- partPath = new Path(tbl.getDataLocation().getPath(),
- Warehouse.makePartName(partSpec));
- } else {
- // Partition exists already. Get the path from the partition. This will
- // get the default path for Hive created partitions or the external path
- // when directly created by user
- partPath = part.getPath()[0];
- fs = partPath.getFileSystem(getConf());
+ Partition oldPart = getPartition(tbl, partSpec, false, null);
+ Path oldPartPath = null;
+ if(oldPart != null) {
+ oldPartPath = oldPart.getPartitionPath();
}
+ Path newPartPath = new Path(loadPath.toUri().getScheme(), loadPath
+ .toUri().getAuthority(), oldPartPath.toUri().getPath());
+
if (replace) {
- Hive.replaceFiles(loadPath, partPath, fs, tmpDirPath);
+ Hive.replaceFiles(loadPath, newPartPath, oldPartPath, tmpDirPath, getConf());
} else {
- Hive.copyFiles(loadPath, partPath, fs);
+ FileSystem fs = FileSystem.get(tbl.getDataLocation(), getConf());
+ Hive.copyFiles(loadPath, newPartPath, fs);
}
// recreate the partition if it existed before
if (!holdDDLTime) {
- part = getPartition(tbl, partSpec, true);
+ getPartition(tbl, partSpec, true, newPartPath.toString());
}
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
- } catch (MetaException e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new HiveException(e);
}
-
}
/**
@@ -1090,6 +1078,11 @@
return new Partition(tbl, partition);
}
+
+ public Partition getPartition(Table tbl, Map partSpec,
+ boolean forceCreate) throws HiveException {
+ return getPartition(tbl, partSpec, forceCreate, null);
+ }
/**
* Returns partition metadata
@@ -1105,7 +1098,7 @@
* @throws HiveException
*/
public Partition getPartition(Table tbl, Map partSpec,
- boolean forceCreate) throws HiveException {
+ boolean forceCreate, String partPath) throws HiveException {
if (!tbl.isValidSpec(partSpec)) {
throw new HiveException("Invalid partition: " + partSpec);
}
@@ -1147,6 +1140,10 @@
tpart.getSd().setOutputFormat(tbl.getTTable().getSd().getOutputFormat());
tpart.getSd().setInputFormat(tbl.getTTable().getSd().getInputFormat());
tpart.getSd().getSerdeInfo().setSerializationLib(tbl.getSerializationLib());
+ if (partPath != null
+ && !partPath.equalsIgnoreCase(tpart.getSd().getLocation())) {
+ tpart.getSd().setLocation(partPath);
+ }
alterPartition(tbl.getTableName(), new Partition(tbl, tpart));
}
}
@@ -1443,19 +1440,31 @@
/**
* Replaces files in the partition with new data set specifed by srcf. Works
- * by moving files
+ * by moving files.
+ * srcf, destf, and tmppath should resident in the same dfs, but the oldPath can be in a
+ * different dfs.
*
* @param srcf
* Files to be moved. Leaf Directories or Globbed File Paths
* @param destf
* The directory where the final data needs to go
- * @param fs
- * The filesystem handle
+ * @param oldPath
+ * The directory where the old data location, need to be cleaned up.
* @param tmppath
* Temporary directory
*/
- static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
- Path tmppath) throws HiveException {
+ static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
+ Path tmppath, Configuration conf) throws HiveException {
+
+ FileSystem fs = null;
+ FsShell fshell = new FsShell();
+ fshell.setConf(conf);
+ try {
+ fs = FileSystem.get(srcf.toUri(), conf);
+ } catch (IOException e1) {
+ throw new HiveException(e1.getMessage(), e1);
+ }
+
FileStatus[] srcs;
try {
srcs = fs.listStatus(srcf);
@@ -1483,8 +1492,17 @@
}
// point of no return
- boolean b = fs.delete(destf, true);
- LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
+ if (oldPath != null) {
+ try {
+ fshell.run(new String[]{"-rmr", oldPath.toUri().toString()});
+ } catch (Exception e) {
+ //swallow the exception
+ }
+ }
+ try {
+ fshell.run(new String[]{"-rmr", destf.toUri().toString()});
+ } catch (Exception e) {
+ }
// create the parent directory otherwise rename can fail if the parent
// doesn't exist
@@ -1493,13 +1511,12 @@
+ destf.getParent().toString());
}
- b = fs.rename(tmppath, destf);
+ boolean b = fs.rename(tmppath, destf);
if (!b) {
throw new HiveException("Unable to move results from " + tmppath
+ " to destination directory: " + destf.getParent().toString());
}
LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
-
} catch (IOException e) {
throw new HiveException("replaceFiles: error while moving files from "
+ tmppath + " to " + destf + "!!!", e);
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 1022214)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy)
@@ -524,13 +524,8 @@
* Temporary directory
*/
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
- FileSystem fs;
- try {
- fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
- Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
- } catch (IOException e) {
- throw new HiveException("addFiles: filesystem error in check phase", e);
- }
+ Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), null, tmpd,
+ Hive.get().getConf());
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1022214)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -3435,8 +3435,18 @@
dest_part = qbm.getDestPartitionForAlias(dest);
dest_tab = dest_part.getTable();
+ Path tabPath = dest_tab.getPath();
+ Path partPath = dest_part.getPartitionPath();
+
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEPARTITIONUSETBLDFS)) {
+ // if the table is in a different dfs than the partition,
+ // replace the partition's dfs with the table's dfs.
+ dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
+ .getAuthority(), partPath.toUri().getPath());
+ } else {
+ dest_path = partPath;
+ }
- dest_path = dest_part.getPath()[0];
if ("har".equalsIgnoreCase(dest_path.toUri().getScheme())) {
throw new SemanticException(ErrorMsg.OVERWRITE_ARCHIVED_PART
.getMsg());