diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6f7b958f48..d51b7c567b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2107,6 +2107,7 @@ public static String getResourceFiles(Configuration conf, SessionState.ResourceT // fill in local files to be added to the task environment SessionState ss = SessionState.get(); Set files = (ss == null) ? null : ss.list_resource(t, null); + files = ss.try_hdfs_resource(t, files); if (files != null) { List realFiles = new ArrayList(files.size()); for (String one : files) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 5ef440b42e..68e7308e8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -25,8 +25,10 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -934,6 +936,48 @@ private boolean checkPreExisting(Path src, Path dest, Configuration conf) return false; } + // CLOUDERA-BUILD. This was moved to FileUtil in Common, but we had to put it + // back due to a NoSuchMethodError during a rolling upgrade, because we have + // a newer MR2 with an older Common, which doesn't have the method. + private boolean compareFs(FileSystem srcFs, FileSystem destFs) { + if (srcFs==null || destFs==null) { + return false; + } + URI srcUri = srcFs.getUri(); + URI dstUri = destFs.getUri(); + if (srcUri.getScheme()==null) { + return false; + } + if (!srcUri.getScheme().equals(dstUri.getScheme())) { + return false; + } + String srcHost = srcUri.getHost(); + String dstHost = dstUri.getHost(); + if ((srcHost!=null) && (dstHost!=null)) { + if (srcHost.equals(dstHost)) { + return srcUri.getPort()==dstUri.getPort(); + } + try { + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + } catch (UnknownHostException ue) { + if (LOG.isDebugEnabled()) { + LOG.debug("Could not compare file-systems. Unknown host: ", ue); + } + return false; + } + if (!srcHost.equals(dstHost)) { + return false; + } + } else if (srcHost==null && dstHost!=null) { + return false; + } else if (srcHost!=null) { + return false; + } + // check for ports + return srcUri.getPort()==dstUri.getPort(); + } + /** * @param src path to the source for the resource * @param dest path in hdfs for the resource @@ -951,7 +995,10 @@ public LocalResource localizeResource(Path src, Path dest, LocalResourceType typ // do not overwrite. LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest); try { - destFS.copyFromLocalFile(false, false, src, dest); + if (compareFs(destFS, src.getFileSystem(conf))) + dest = src; + else + destFS.copyFromLocalFile(false, false, src, dest); } catch (IOException e) { LOG.info("Looks like another thread is writing the same file will wait."); int waitAttempts = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 540bafdc45..2769ca0f57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1154,6 +1154,8 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) Set resourceSet = resourceMaps.getResourceSet(t); Map> resourcePathMap = resourceMaps.getResourcePathMap(t); Map> reverseResourcePathMap = resourceMaps.getReverseResourcePathMap(t); + Map hdfsCacheMap = resourceMaps.getHdfsCacheMap(t); + List localized = new ArrayList(); try { for (String value : values) { @@ -1168,6 +1170,7 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) } else { // for local file and hdfs, key and value are same. key = downloadedURLs.get(0).toString(); + hdfsCacheMap.put(key, value); } Set downloadedValues = new HashSet(); @@ -1358,6 +1361,17 @@ public void delete_resources(ResourceType t) { } } + public Set try_hdfs_resource(ResourceType t, Set files) { + Map hdfsCacheMap = resourceMaps.getHdfsCacheMap(t); + Set newFiles = new HashSet<>(files.size()); + resourceMaps.getHdfsCacheMap(t); + for(String file : files){ + String hdfsFile = hdfsCacheMap.get(file); + newFiles.add(hdfsFile!=null ? hdfsFile:file); + } + return newFiles; + } + public String getCommandType() { if (commandType == null) { return null; @@ -1627,10 +1641,13 @@ public Timestamp getQueryCurrentTimestamp() { // stores all the downloaded resources as key and the jars which depend on these resources as values in form of a list. Used for deleting transitive dependencies. private final Map>> reverse_resource_path_map; + private final Map> hdfsCacheMap; + public ResourceMaps() { resource_map = new HashMap>(); resource_path_map = new HashMap>>(); reverse_resource_path_map = new HashMap>>(); + hdfsCacheMap = new HashMap<>(); } @@ -1665,4 +1682,13 @@ public ResourceMaps() { return result; } + public Map getHdfsCacheMap(SessionState.ResourceType t) { + Map result = hdfsCacheMap.get(t); + if (result == null) { + result = new HashMap(); + hdfsCacheMap.put(t, result); + } + return result; + } + }