Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
During job submission, the JobResourceUploader currently iterates over for-loops of -libjars, -files, and -archives sequentially, which can significantly slow down job startup time when a large number of files need to be uploaded, especially if staging the files to a cloud object-store based FileSystem implementation like S3, GCS, WABS, etc., where round-trip latencies may be higher than HDFS despite having good throughput when parallelized:
JobResourceUploader.java
if (files != null) { FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms); String[] fileArr = files.split(","); for (String tmpFile : fileArr) { URI tmpURI = null; try { tmpURI = new URI(tmpFile); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheFile(pathURI, conf); } catch (URISyntaxException ue) { // should not throw a uri exception throw new IOException("Failed to create uri for " + tmpFile, ue); } } } if (libjars != null) { FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms); String[] libjarsArr = libjars.split(","); for (String tmpjars : libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication); DistributedCache.addFileToClassPath( new Path(newPath.toUri().getPath()), conf, jtFs); } } if (archives != null) { FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); String[] archivesArr = archives.split(","); for (String tmpArchives : archivesArr) { URI tmpURI; try { tmpURI = new URI(tmpArchives); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheArchive(pathURI, conf); } catch (URISyntaxException ue) { // should not throw an uri excpetion throw new IOException("Failed to create uri for " + tmpArchives, ue); } } }
Parallelizing the upload of these files would improve job submission time.