diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 271554c..1e2d664 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -99,16 +98,16 @@ public class Hive { private IMetaStoreClient metaStoreClient; private String currentDatabase; - private static ThreadLocal hiveDB = new ThreadLocal() { + private static ThreadLocal hiveDB = new ThreadLocal() { @Override - protected synchronized Object initialValue() { + protected synchronized Hive initialValue() { return null; } @Override public synchronized void remove() { if (this.get() != null) { - ((Hive) this.get()).close(); + this.get().close(); } super.remove(); } @@ -1856,76 +1855,93 @@ public class Hive { static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf, boolean replace) throws HiveException { try { + FileStatus destStatus = !replace && fs.exists(destf) ? fs.getFileStatus(destf) : null; + if (destStatus != null && !destStatus.isDir()) { + throw new HiveException("checkPaths: destination " + destf + + " should be a directory"); + } for (FileStatus src : srcs) { - FileStatus[] items = fs.listStatus(src.getPath()); - for (FileStatus item : items) { - Path itemStaging = item.getPath(); - - if (Utilities.isTempPath(item)) { - // This check is redundant because temp files are removed by - // execution layer before - // calling loadTable/Partition. But leaving it in just in case. - fs.delete(itemStaging, true); - continue; - } - if (item.isDir()) { - throw new HiveException("checkPaths: " + src.getPath() - + " has nested directory" + itemStaging); - } - if (!replace) { - // It's possible that the file we're copying may have the same - // relative name as an existing file in the "destf" directory. - // So let's make a quick check to see if we can rename any - // potential offenders so as to allow them to move into the - // "destf" directory. The scheme is dead simple: simply tack - // on "_copy_N" where N starts at 1 and works its way up until - // we find a free space. - - // Note: there are race conditions here, but I don't believe - // they're worse than what was already present. - int counter = 1; - - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - String name = itemStaging.getName(); - String filetype; - int index = name.lastIndexOf('.'); - if (index >= 0) { - filetype = name.substring(index); - name = name.substring(0, index); - } else { - filetype = ""; + if (src.isDir()) { + for (FileStatus item : fs.listStatus(src.getPath())) { + if (!checkPaths(fs, item, destf, replace)) { + throw new HiveException("checkPaths: " + src.getPath() + + " has nested directory " + item.getPath()); } + } + } else { + checkPaths(fs, src, destf, replace); + } + } + } catch (IOException e) { + throw new HiveException("checkPaths: filesystem error in check phase", e); + } + } - Path itemDest = new Path(destf, itemStaging.getName()); - Path itemStagingBase = new Path(itemStaging.getParent(), name); + private static boolean checkPaths(FileSystem fs, FileStatus srcf, Path destf, + boolean replace) throws IOException { + Path itemStaging = srcf.getPath(); + + if (Utilities.isTempPath(srcf)) { + // This check is redundant because temp files are removed by + // execution layer before + // calling loadTable/Partition. But leaving it in just in case. + fs.delete(itemStaging, true); + return true; + } + if (srcf.isDir()) { + return false; + } + if (!replace) { + // It's possible that the file we're copying may have the same + // relative name as an existing file in the "destf" directory. + // So let's make a quick check to see if we can rename any + // potential offenders so as to allow them to move into the + // "destf" directory. The scheme is dead simple: simply tack + // on "_copy_N" where N starts at 1 and works its way up until + // we find a free space. + + // Note: there are race conditions here, but I don't believe + // they're worse than what was already present. + int counter = 1; + + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + String name = itemStaging.getName(); + String filetype; + int index = name.lastIndexOf('.'); + if (index >= 0) { + filetype = name.substring(index); + name = name.substring(0, index); + } else { + filetype = ""; + } - while (fs.exists(itemDest)) { - Path proposedStaging = itemStagingBase - .suffix("_copy_" + counter++).suffix(filetype); - Path proposedDest = new Path(destf, proposedStaging.getName()); + Path itemDest = new Path(destf, itemStaging.getName()); + Path itemStagingBase = new Path(itemStaging.getParent(), name); - if (fs.exists(proposedDest)) { - // There's already a file in our destination directory with our - // _copy_N suffix. We've been here before... - LOG.trace(proposedDest + " already exists"); - continue; - } + while (fs.exists(itemDest)) { + Path proposedStaging = itemStagingBase + .suffix("_copy_" + counter++).suffix(filetype); + Path proposedDest = new Path(destf, proposedStaging.getName()); - if (!fs.rename(itemStaging, proposedStaging)) { - LOG.debug("Unsuccessfully in attempt to rename " + itemStaging + " to " + proposedStaging + "..."); - continue; - } + if (fs.exists(proposedDest)) { + // There's already a file in our destination directory with our + // _copy_N suffix. We've been here before... + LOG.trace(proposedDest + " already exists"); + continue; + } - LOG.debug("Successfully renamed " + itemStaging + " to " + proposedStaging); - itemDest = proposedDest; - } - } + if (!fs.rename(itemStaging, proposedStaging)) { + LOG.debug("Unsuccessfully in attempt to rename " + itemStaging + + " to " + proposedStaging + "..."); + continue; } + + LOG.debug("Successfully renamed " + itemStaging + " to " + proposedStaging); + itemDest = proposedDest; } - } catch (IOException e) { - throw new HiveException("checkPaths: filesystem error in check phase", e); } + return true; } static protected void copyFiles(Path srcf, Path destf, FileSystem fs) @@ -1959,13 +1975,12 @@ public class Hive { // move it, move it try { for (FileStatus src : srcs) { - FileStatus[] items = fs.listStatus(src.getPath()); - for (FileStatus item : items) { - Path source = item.getPath(); - Path target = new Path(destf, item.getPath().getName()); - if (!fs.rename(source, target)) { - throw new IOException("Cannot move " + source + " to " + target); + if (src.isDir()) { + for (FileStatus item : fs.listStatus(src.getPath())) { + copyFile(item.getPath(), destf, fs); } + } else { + copyFile(src.getPath(), destf, fs); } } } catch (IOException e) { @@ -1973,6 +1988,13 @@ public class Hive { } } + private static void copyFile(Path srcf, Path destf, FileSystem fs) throws IOException { + Path target = new Path(destf, srcf.getName()); + if (!fs.rename(srcf, target)) { + throw new IOException("Cannot move " + srcf + " to " + target); + } + } + /** * Replaces files in the partition with new data set specified by srcf. Works * by renaming directory of srcf to the destination file. diff --git ql/src/test/queries/clientpositive/load_fs.q ql/src/test/queries/clientpositive/load_fs.q index c1ac29c..d5571b9 100644 --- ql/src/test/queries/clientpositive/load_fs.q +++ ql/src/test/queries/clientpositive/load_fs.q @@ -19,3 +19,17 @@ select count(*) from load_overwrite2; load data inpath '${system:test.tmp.dir}/load2_*' overwrite into table load_overwrite; show table extended like load_overwrite; select count(*) from load_overwrite; + +load data inpath '${system:test.tmp.dir}/load_overwrite/kv1.txt' into table load_overwrite2; +show table extended like load_overwrite; +show table extended like load_overwrite2; + +load data inpath '${system:test.tmp.dir}/load_overwrite/kv2.txt' into table load_overwrite2; +show table extended like load_overwrite; +show table extended like load_overwrite2; + +load data inpath '${system:test.tmp.dir}/load_overwrite/kv3.txt' into table load_overwrite2; +show table extended like load_overwrite; +show table extended like load_overwrite2; + +select count(*) from load_overwrite2; diff --git ql/src/test/results/clientpositive/load_fs.q.out ql/src/test/results/clientpositive/load_fs.q.out index 7f3d23b..630266f 100644 --- ql/src/test/results/clientpositive/load_fs.q.out +++ ql/src/test/results/clientpositive/load_fs.q.out @@ -132,3 +132,132 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@load_overwrite #### A masked pattern was here #### 1025 +#### A masked pattern was here #### +PREHOOK: type: LOAD +PREHOOK: Output: default@load_overwrite2 +#### A masked pattern was here #### +POSTHOOK: type: LOAD +POSTHOOK: Output: default@load_overwrite2 +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:2 +totalFileSize:6007 +maxFileSize:5791 +minFileSize:216 +#### A masked pattern was here #### + +PREHOOK: query: show table extended like load_overwrite2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite2 +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite2 +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:5812 +maxFileSize:5812 +minFileSize:5812 +#### A masked pattern was here #### + +#### A masked pattern was here #### +PREHOOK: type: LOAD +PREHOOK: Output: default@load_overwrite2 +#### A masked pattern was here #### +POSTHOOK: type: LOAD +POSTHOOK: Output: default@load_overwrite2 +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:216 +maxFileSize:216 +minFileSize:216 +#### A masked pattern was here #### + +PREHOOK: query: show table extended like load_overwrite2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite2 +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite2 +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:2 +totalFileSize:11603 +maxFileSize:5812 +minFileSize:5791 +#### A masked pattern was here #### + +#### A masked pattern was here #### +PREHOOK: type: LOAD +PREHOOK: Output: default@load_overwrite2 +#### A masked pattern was here #### +POSTHOOK: type: LOAD +POSTHOOK: Output: default@load_overwrite2 +PREHOOK: query: show table extended like load_overwrite +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:0 +totalFileSize:0 +maxFileSize:0 +minFileSize:0 +#### A masked pattern was here #### + +PREHOOK: query: show table extended like load_overwrite2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like load_overwrite2 +POSTHOOK: type: SHOW_TABLESTATUS +tableName:load_overwrite2 +#### A masked pattern was here #### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:3 +totalFileSize:11819 +maxFileSize:5812 +minFileSize:216 +#### A masked pattern was here #### + +PREHOOK: query: select count(*) from load_overwrite2 +PREHOOK: type: QUERY +PREHOOK: Input: default@load_overwrite2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from load_overwrite2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_overwrite2 +#### A masked pattern was here #### +1025