commit ce8c5ba8fc56eefeb2bc5e7d8bf82e2e27141047 Author: Mithun RK Date: Thu Sep 21 16:20:32 2017 -0700 HIVE-17574: Avoid multiple copies of HDFS-based jars when localizing job-jars (Chris Drome, reviewed by Mithun Radhakrishnan) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ce391fdcda..78831f2cce 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1029,6 +1029,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEADDEDFILES("hive.added.files.path", "", "This an internal parameter."), HIVEADDEDJARS("hive.added.jars.path", "", "This an internal parameter."), HIVEADDEDARCHIVES("hive.added.archives.path", "", "This an internal parameter."), + HIVEADDFILESUSEHDFSLOCATION("hive.resource.use.hdfs.location", true, "Reference HDFS based files/jars directly instead of " + + "copy to session based HDFS scratch directory, to make distributed cache more useful."), HIVE_CURRENT_DATABASE("hive.current.database", "", "Database name used by current session. Internal usage only.", true), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ae63727999..8311037570 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1763,9 +1763,27 @@ public static String getNameMessage(Throwable e) { } public static String getResourceFiles(Configuration conf, SessionState.ResourceType t) { - // fill in local files to be added to the task environment + // fill in local files (includes copy of HDFS files) to be added to the task environment SessionState ss = SessionState.get(); Set files = (ss == null) ? null : ss.list_resource(t, null); + return validateFiles(conf, files); + } + + public static String getHdfsResourceFiles(Configuration conf, SessionState.ResourceType type) { + // fill in HDFS files to be added to the task environment + SessionState ss = SessionState.get(); + Set files = (ss == null) ? null : ss.list_hdfs_resource(type); + return validateFiles(conf, files); + } + + public static String getLocalResourceFiles(Configuration conf, SessionState.ResourceType type) { + // fill in local only files (excludes copy of HDFS files) to be added to the task environment + SessionState ss = SessionState.get(); + Set files = (ss == null) ? null : ss.list_local_resource(type); + return validateFiles(conf, files); + } + + private static String validateFiles(Configuration conf, Set files){ if (files != null) { List realFiles = new ArrayList(files.size()); for (String one : files) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e7f2400931..aae3480379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -870,13 +870,49 @@ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOExcep String hdfsDirPathStr, Configuration conf) throws IOException, LoginException { List tmpResources = new ArrayList(); - addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, - getTempFilesFromConf(conf), null); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEADDFILESUSEHDFSLOCATION)) { + // reference HDFS based resource directly, to use distribute cache efficiently. + addHdfsResource(conf, tmpResources, LocalResourceType.FILE, getHdfsTempFilesFromConf(conf)); + // local resources are session based. + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getLocalTempFilesFromConf(conf), null); + } else { + // all resources including HDFS are session based. + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf), null); + } + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf), null); return tmpResources; } + private void addHdfsResource(Configuration conf, List tmpResources, + LocalResourceType type, String[] files) throws IOException { + for (String file: files) { + if (StringUtils.isNotBlank(file)) { + Path dest = new Path(file); + FileSystem destFS = dest.getFileSystem(conf); + LocalResource localResource = createLocalResource(destFS, dest, type, + LocalResourceVisibility.PRIVATE); + tmpResources.add(localResource); + } + } + } + + private static String[] getHdfsTempFilesFromConf(Configuration conf) { + String addedFiles = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.FILE); + String addedJars = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.JAR); + String allFiles = addedJars + "," + addedFiles; + return allFiles.split(","); + } + + private static String[] getLocalTempFilesFromConf(Configuration conf) { + String addedFiles = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.FILE); + String addedJars = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.JAR); + String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); + String allFiles = auxJars + "," + addedJars + "," + addedFiles; + return allFiles.split(","); + } + public static String[] getTempFilesFromConf(Configuration conf) { if (conf == null) return new String[0]; // In tests. String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6dece05c3a..4820fed034 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1448,7 +1448,7 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) String key; //get the local path of downloaded jars. - List downloadedURLs = resolveAndDownload(value, convertToUnix); + List downloadedURLs = resolveAndDownload(t, value, convertToUnix); if (ResourceDownloader.isIvyUri(value)) { // get the key to store in map @@ -1494,9 +1494,14 @@ public String add_resource(ResourceType t, String value, boolean convertToUnix) } @VisibleForTesting - protected List resolveAndDownload(String value, boolean convertToUnix) + protected List resolveAndDownload(ResourceType resourceType, String value, boolean convertToUnix) throws URISyntaxException, IOException { - return resourceDownloader.resolveAndDownload(value, convertToUnix); + List uris = resourceDownloader.resolveAndDownload(value, convertToUnix); + if (ResourceDownloader.isHdfsUri(value)) { + assert uris.size() == 1 : "There should only be one URI localized-resource."; + resourceMaps.getLocalHdfsLocationMap(resourceType).put(uris.get(0).toString(), value); + } + return uris; } public void delete_resources(ResourceType t, List values) { @@ -1514,12 +1519,18 @@ public void delete_resources(ResourceType t, List values) { if (ResourceDownloader.isIvyUri(value)) { key = ResourceDownloader.createURI(value).getAuthority(); } + else if (ResourceDownloader.isHdfsUri(value)) { + String removedKey = removeHdfsFilesFromMapping(t, value); + // remove local copy of HDFS location from resource map. + if (removedKey != null) { + key = removedKey; + } + } } catch (URISyntaxException e) { throw new RuntimeException("Invalid uri string " + value + ", " + e.getMessage()); } // get all the dependencies to delete - Set resourcePaths = resourcePathMap.get(key); if (resourcePaths == null) { return; @@ -1539,7 +1550,6 @@ public void delete_resources(ResourceType t, List values) { resources.removeAll(deleteList); } - public Set list_resource(ResourceType t, List filter) { Set orig = resourceMaps.getResourceSet(t); if (orig == null) { @@ -1558,11 +1568,53 @@ public void delete_resources(ResourceType t, List values) { } } + private String removeHdfsFilesFromMapping(ResourceType t, String file){ + String key = null; + if (resourceMaps.getLocalHdfsLocationMap(t).containsValue(file)){ + for (Map.Entry entry : resourceMaps.getLocalHdfsLocationMap(t).entrySet()){ + if (entry.getValue().equals(file)){ + key = entry.getKey(); + resourceMaps.getLocalHdfsLocationMap(t).remove(key); + } + } + } + return key; + } + + public Set list_local_resource(ResourceType type) { + Set resources = new HashSet(list_resource(type, null)); + Set set = resourceMaps.getResourceSet(type); + for (String file : set){ + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){ + resources.remove(file); + } + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){ + resources.remove(file); + } + } + return resources; + } + + public Set list_hdfs_resource(ResourceType type) { + Set set = resourceMaps.getResourceSet(type); + Set result = new HashSet(); + for (String file : set){ + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){ + result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).get(file)); + } + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){ + result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).get(file)); + } + } + return result; + } + public void delete_resources(ResourceType t) { Set resources = resourceMaps.getResourceSet(t); if (resources != null && !resources.isEmpty()) { delete_resources(t, new ArrayList(resources)); resourceMaps.getResourceMap().remove(t); + resourceMaps.getAllLocalHdfsLocationMap().remove(t); } } @@ -1951,18 +2003,24 @@ public void addCleanupItem(Closeable item) { private final Map>> resource_path_map; // stores all the downloaded resources as key and the jars which depend on these resources as values in form of a list. Used for deleting transitive dependencies. private final Map>> reverse_resource_path_map; + // stores mappings from local to hdfs location for all resource types. + private final Map> local_hdfs_resource_map; public ResourceMaps() { resource_map = new HashMap>(); resource_path_map = new HashMap>>(); reverse_resource_path_map = new HashMap>>(); - + local_hdfs_resource_map = new HashMap>(); } public Map> getResourceMap() { return resource_map; } + public Map> getAllLocalHdfsLocationMap() { + return local_hdfs_resource_map; + } + public Set getResourceSet(SessionState.ResourceType t) { Set result = resource_map.get(t); if (result == null) { @@ -1990,4 +2048,13 @@ public ResourceMaps() { return result; } + public Map getLocalHdfsLocationMap(SessionState.ResourceType type){ + Map result = local_hdfs_resource_map.get(type); + if (result == null) { + result = new HashMap(); + local_hdfs_resource_map.put(type, result); + } + return result; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java index faf86fb566..42ed302115 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java @@ -59,6 +59,10 @@ public static boolean isIvyUri(String value) throws URISyntaxException { return "ivy".equalsIgnoreCase(createURI(value).getScheme()); } + public static boolean isHdfsUri(String value) throws URISyntaxException { + return "hdfs".equalsIgnoreCase(createURI(value).getScheme()); + } + public static boolean isFileUri(String value) { String scheme = null; try { @@ -84,7 +88,9 @@ public static boolean isFileUri(String value) { switch (getURLType(source)) { case FILE: return isLocalAllowed ? Lists.newArrayList(source) : null; case IVY: return dependencyResolver.downloadDependencies(source); - case OTHER: return Lists.newArrayList( + case HDFS: + case OTHER: + return Lists.newArrayList( createURI(downloadResource(source, subDir, convertToUnix))); default: throw new AssertionError(getURLType(source)); } @@ -117,13 +123,14 @@ private static void ensureDirectory(File resourceDir) { } } - private enum UriType { IVY, FILE, OTHER }; + private enum UriType { IVY, FILE, HDFS, OTHER }; private static ResourceDownloader.UriType getURLType(URI value) throws URISyntaxException { String scheme = value.getScheme(); if (scheme == null) return UriType.FILE; scheme = scheme.toLowerCase(); if ("ivy".equals(scheme)) return UriType.IVY; if ("file".equals(scheme)) return UriType.FILE; + if ("hdfs".equals(scheme)) return UriType.HDFS; return UriType.OTHER; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java index dafbe16686..4b74972dd2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java @@ -83,7 +83,7 @@ public void testSanity() throws URISyntaxException, IOException { list.add(createURI(TEST_JAR_DIR + "testjar5.jar")); //return all the dependency urls - Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); addList.add(query); ss.add_resources(t, addList); Set dependencies = ss.list_resource(t, null); @@ -119,7 +119,7 @@ public void testDuplicateAdds() throws URISyntaxException, IOException { Collections.sort(list); - Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); for (int i = 0; i < 10; i++) { addList.add(query); } @@ -157,8 +157,8 @@ public void testUnion() throws URISyntaxException, IOException { list2.add(createURI(TEST_JAR_DIR + "testjar3.jar")); list2.add(createURI(TEST_JAR_DIR + "testjar4.jar")); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -208,8 +208,8 @@ public void testDeleteJar() throws URISyntaxException, IOException { Collections.sort(list1); Collections.sort(list2); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -267,9 +267,9 @@ public void testDeleteJarMultiple() throws URISyntaxException, IOException { Collections.sort(list2); Collections.sort(list3); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); - Mockito.when(ss.resolveAndDownload(query3, false)).thenReturn(list3); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query3, false)).thenReturn(list3); addList.add(query1); addList.add(query2); addList.add(query3);