diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6f7b958..d51b7c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2107,6 +2107,7 @@ public static String getResourceFiles(Configuration conf, SessionState.ResourceT // fill in local files to be added to the task environment SessionState ss = SessionState.get(); Set files = (ss == null) ? null : ss.list_resource(t, null); + files = ss.try_hdfs_resource(t, files); if (files != null) { List realFiles = new ArrayList(files.size()); for (String one : files) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 5ef440b..6ae8b9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -951,7 +952,10 @@ public LocalResource localizeResource(Path src, Path dest, LocalResourceType typ // do not overwrite. LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest); try { - destFS.copyFromLocalFile(false, false, src, dest); + if (FileUtil.compareFs(destFS, src.getFileSystem(conf))) + dest = src; + else + destFS.copyFromLocalFile(false, false, src, dest); } catch (IOException e) { LOG.info("Looks like another thread is writing the same file will wait."); int waitAttempts = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 540bafd..2769ca0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1154,6 +1154,8 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) Set resourceSet = resourceMaps.getResourceSet(t); Map> resourcePathMap = resourceMaps.getResourcePathMap(t); Map> reverseResourcePathMap = resourceMaps.getReverseResourcePathMap(t); + Map hdfsCacheMap = resourceMaps.getHdfsCacheMap(t); + List localized = new ArrayList(); try { for (String value : values) { @@ -1168,6 +1170,7 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) } else { // for local file and hdfs, key and value are same. key = downloadedURLs.get(0).toString(); + hdfsCacheMap.put(key, value); } Set downloadedValues = new HashSet(); @@ -1358,6 +1361,17 @@ public void delete_resources(ResourceType t) { } } + public Set try_hdfs_resource(ResourceType t, Set files) { + Map hdfsCacheMap = resourceMaps.getHdfsCacheMap(t); + Set newFiles = new HashSet<>(files.size()); + resourceMaps.getHdfsCacheMap(t); + for(String file : files){ + String hdfsFile = hdfsCacheMap.get(file); + newFiles.add(hdfsFile!=null ? hdfsFile:file); + } + return newFiles; + } + public String getCommandType() { if (commandType == null) { return null; @@ -1627,10 +1641,13 @@ public Timestamp getQueryCurrentTimestamp() { // stores all the downloaded resources as key and the jars which depend on these resources as values in form of a list. Used for deleting transitive dependencies. private final Map>> reverse_resource_path_map; + private final Map> hdfsCacheMap; + public ResourceMaps() { resource_map = new HashMap>(); resource_path_map = new HashMap>>(); reverse_resource_path_map = new HashMap>>(); + hdfsCacheMap = new HashMap<>(); } @@ -1665,4 +1682,13 @@ public ResourceMaps() { return result; } + public Map getHdfsCacheMap(SessionState.ResourceType t) { + Map result = hdfsCacheMap.get(t); + if (result == null) { + result = new HashMap(); + hdfsCacheMap.put(t, result); + } + return result; + } + }