Index: ql/src/test/results/clientpositive/load_overwrite.q.out =================================================================== --- ql/src/test/results/clientpositive/load_overwrite.q.out (revision 0) +++ ql/src/test/results/clientpositive/load_overwrite.q.out (revision 0) @@ -0,0 +1,125 @@ +PREHOOK: query: create table load_overwrite like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table load_overwrite like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@load_overwrite +PREHOOK: query: insert overwrite table load_overwrite select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@load_overwrite +POSTHOOK: query: insert overwrite table load_overwrite select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@load_overwrite +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +tableName:load_overwrite +owner:null +location:pfile:/data/users/nzhang/work/2/apache-hive/build/ql/test/data/warehouse/load_overwrite +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:5812 +maxFileSize:5812 +minFileSize:5812 +lastAccessTime:0 +lastUpdateTime:1292465082000 + +PREHOOK: query: select count(*) from load_overwrite +PREHOOK: type: QUERY +PREHOOK: Input: default@load_overwrite +PREHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-43_366_2197078026513947066/-mr-10000 +POSTHOOK: query: select count(*) from load_overwrite +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_overwrite +POSTHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-43_366_2197078026513947066/-mr-10000 +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +500 +PREHOOK: query: load data local inpath '../data/files/kv1.txt' into table load_overwrite +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/kv1.txt' into table load_overwrite +POSTHOOK: type: LOAD +POSTHOOK: Output: default@load_overwrite +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +tableName:load_overwrite +owner:null +location:pfile:/data/users/nzhang/work/2/apache-hive/build/ql/test/data/warehouse/load_overwrite +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:2 +totalFileSize:11624 +maxFileSize:5812 +minFileSize:5812 +lastAccessTime:0 +lastUpdateTime:1292465086000 + +PREHOOK: query: select count(*) from load_overwrite +PREHOOK: type: QUERY +PREHOOK: Input: default@load_overwrite +PREHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-46_973_8511923253102164649/-mr-10000 +POSTHOOK: query: select count(*) from load_overwrite +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_overwrite +POSTHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-46_973_8511923253102164649/-mr-10000 +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +1000 +PREHOOK: query: load data local inpath '../data/files/kv1.txt' overwrite into table load_overwrite +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/kv1.txt' overwrite into table load_overwrite +POSTHOOK: type: LOAD +POSTHOOK: Output: default@load_overwrite +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +tableName:load_overwrite +owner:null +location:pfile:/data/users/nzhang/work/2/apache-hive/build/ql/test/data/warehouse/load_overwrite +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:5812 +maxFileSize:5812 +minFileSize:5812 +lastAccessTime:0 +lastUpdateTime:1292465090000 + +PREHOOK: query: select count(*) from load_overwrite +PREHOOK: type: QUERY +PREHOOK: Input: default@load_overwrite +PREHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-50_537_1323277203199221916/-mr-10000 +POSTHOOK: query: select count(*) from load_overwrite +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_overwrite +POSTHOOK: Output: file:/tmp/nzhang/hive_2010-12-15_18-04-50_537_1323277203199221916/-mr-10000 +POSTHOOK: Lineage: load_overwrite.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: load_overwrite.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +500 Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (revision 1049722) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (working copy) @@ -29,9 +29,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; @@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.TextInputFormat; @@ -118,7 +116,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, null, false); + db.loadTable(hadoopDataFile[i], src, false, false); i++; } Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 1049722) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QTestUtil.QTestSetup; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; @@ -39,7 +40,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.tools.LineageInfo; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.hive.ql.QTestUtil.QTestSetup; /** * TestHiveHistory. @@ -104,7 +104,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, null, false); + db.loadTable(hadoopDataFile[i], src, false, false); i++; } Index: ql/src/test/queries/clientpositive/load_overwrite.q =================================================================== --- ql/src/test/queries/clientpositive/load_overwrite.q (revision 0) +++ ql/src/test/queries/clientpositive/load_overwrite.q (revision 0) @@ -0,0 +1,15 @@ +create table load_overwrite like src; + +insert overwrite table load_overwrite select * from src; +show table extended like load_overwrite; +select count(*) from load_overwrite; + + +load data local inpath '../data/files/kv1.txt' into table load_overwrite; +show table extended like load_overwrite; +select count(*) from load_overwrite; + + +load data local inpath '../data/files/kv1.txt' overwrite into table load_overwrite; +show table extended like load_overwrite; +select count(*) from load_overwrite; Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 1049722) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy) @@ -515,17 +515,17 @@ } /** - * Replaces files in the partition with new data set specified by srcf. Works - * by moving files + * Replaces the directory corresponding to the table by srcf. Works by + * deleting the table directory and renaming the source directory. * * @param srcf - * Files to be replaced. Leaf directories or globbed file paths + * Source directory * @param tmpd * Temporary directory */ - protected void replaceFiles(Path srcf, Path tmpd) throws HiveException { - Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), null, tmpd, - Hive.get().getConf()); + protected void replaceFiles(Path srcf) throws HiveException { + Path tableDest = new Path(getDataLocation().getPath()); + Hive.replaceFiles(srcf, tableDest, tableDest, Hive.get().getConf()); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1049722) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -950,8 +949,7 @@ * The temporary directory. */ public void loadPartition(Path loadPath, String tableName, - Map partSpec, boolean replace, Path tmpDirPath, - boolean holdDDLTime) + Map partSpec, boolean replace, boolean holdDDLTime) throws HiveException { Table tbl = getTable(tableName); try { @@ -975,7 +973,7 @@ .toUri().getAuthority(), partPath.toUri().getPath()); if (replace) { - Hive.replaceFiles(loadPath, newPartPath, oldPartPath, tmpDirPath, getConf()); + Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf()); } else { FileSystem fs = FileSystem.get(tbl.getDataLocation(), getConf()); Hive.copyFiles(loadPath, newPartPath, fs); @@ -1010,7 +1008,7 @@ */ public ArrayList> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, - Path tmpDirPath, int numDP, boolean holdDDLTime) + int numDP, boolean holdDDLTime) throws HiveException { try { @@ -1045,7 +1043,7 @@ fullPartSpecs.add(fullPartSpec); // finally load the partition -- move the file to the final table address - loadPartition(partPath, tableName, fullPartSpec, replace, tmpDirPath, holdDDLTime); + loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime); LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } return fullPartSpecs; @@ -1070,11 +1068,11 @@ * The temporary directory. */ public void loadTable(Path loadPath, String tableName, boolean replace, - Path tmpDirPath, boolean holdDDLTime) throws HiveException { + boolean holdDDLTime) throws HiveException { Table tbl = getTable(tableName); if (replace) { - tbl.replaceFiles(loadPath, tmpDirPath); + tbl.replaceFiles(loadPath); } else { tbl.copyFiles(loadPath); } @@ -1503,90 +1501,58 @@ } /** - * Replaces files in the partition with new data set specifed by srcf. Works - * by moving files. - * srcf, destf, and tmppath should resident in the same dfs, but the oldPath can be in a - * different dfs. + * Replaces files in the partition with new data set specified by srcf. Works + * by renaming directory of srcf to the destination file. + * srcf, destf, and tmppath should resident in the same DFS, but the oldPath can be in a + * different DFS. * * @param srcf - * Files to be moved. Leaf Directories or Globbed File Paths + * Source directory to be renamed to tmppath. It should be a + * leaf directory where the final data files reside. However it + * could potentially contain subdirectories as well. * @param destf * The directory where the final data needs to go * @param oldPath * The directory where the old data location, need to be cleaned up. * @param tmppath - * Temporary directory + * Temporary directory. tmppath should not exist before the call as it + * is generated uniquely per MoveTask. */ static protected void replaceFiles(Path srcf, Path destf, Path oldPath, - Path tmppath, Configuration conf) throws HiveException { + Configuration conf) throws HiveException { - FileSystem fs = null; - FsShell fshell = new FsShell(); - fshell.setConf(conf); try { - fs = FileSystem.get(srcf.toUri(), conf); - } catch (IOException e1) { - throw new HiveException(e1.getMessage(), e1); - } + FileSystem fs = srcf.getFileSystem(conf); - FileStatus[] srcs; - try { - srcs = fs.listStatus(srcf); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } - if (srcs == null) { - LOG.info("No sources specified to move: " + srcf); - return; - // srcs = new FileStatus[0]; Why is this needed? - } - checkPaths(fs, srcs, destf, true); + assert(fs.getFileStatus(srcf).isDir()); - try { - fs.mkdirs(tmppath); - for (FileStatus src : srcs) { - FileStatus[] items = fs.listStatus(src.getPath()); - for (int j = 0; j < items.length; j++) { - if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j] - .getPath().getName()))) { - throw new HiveException("Error moving: " + items[j].getPath() - + " into: " + tmppath); - } - } - } - - // point of no return + // point of no return if (oldPath != null) { try { - fshell.run(new String[]{"-rmr", oldPath.toUri().toString()}); + // oldPath can be in a different file system + FileSystem fs2 = oldPath.getFileSystem(conf); + fs2.delete(oldPath, true); // remove whole directory } catch (Exception e) { - //swallow the exception + // swallow the exception + LOG.warn("Directory " + oldPath.toString() + " canot be removed."); } } - try { - fshell.run(new String[]{"-rmr", destf.toUri().toString()}); - } catch (Exception e) { - } - // create the parent directory otherwise rename can fail if the parent - // doesn't exist - if (!fs.mkdirs(destf.getParent())) { - throw new HiveException("Unable to create destination directory: " - + destf.getParent().toString()); + // rename can fail if the parent doesn't exist + if (!fs.exists(destf.getParent())) { + fs.mkdirs(destf.getParent()); } - boolean b = fs.rename(tmppath, destf); + boolean b = fs.rename(srcf, destf); if (!b) { - throw new HiveException("Unable to move results from " + tmppath - + " to destination directory: " + destf.getParent().toString()); + throw new HiveException("Unable to move results from " + srcf.toString() + + " to destination directory: " + destf.toString()); } - LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b); + LOG.debug("Renaming:" + srcf.toString() + ",Status:" + b); + } catch (IOException e) { - throw new HiveException("replaceFiles: error while moving files from " - + tmppath + " to " + destf + "!!!", e); + throw new HiveException(e.getMessage(), e); } - // In case of error, we should leave the temporary data there, so - // that user can recover the data if necessary. } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 1049722) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -171,8 +171,7 @@ if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable() - .getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()), - tbd.getHoldDDLTime()); + .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime()); if (work.getOutputs() != null) { work.getOutputs().add(new WriteEntity(table)); } @@ -202,7 +201,6 @@ tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), - new Path(tbd.getTmpDir()), dpCtx.getNumDPCols(), tbd.getHoldDDLTime()); @@ -243,8 +241,7 @@ dc = null; // reset data container to prevent it being added again. } else { // static partitions db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), - tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()), - tbd.getHoldDDLTime()); + tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); dc = new DataContainer(table.getTTable(), partn.getTPartition()); // add this partition to post-execution hook