diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 72855cc..637390d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -708,6 +708,11 @@ private static void addDeprecatedKeys() { NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; + /** Number of threads to use for private localization fetching.*/ + public static final String NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT = + NM_PREFIX + "localizer.private.fetch.thread-count"; + public static final int DEFAULT_NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT = 4; + /** Where to store container logs.*/ public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index f82f894..e1e6b37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -186,9 +187,18 @@ public LocalizationProtocol run() { } } + int getDownloadThreadCount() { + return conf.getInt( + YarnConfiguration.NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT); + } + ExecutorService createDownloadThreadPool() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("ContainerLocalizer Downloader").build()); + int nThreads = getDownloadThreadCount(); + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("ContainerLocalizer Downloader #%d") + .build(); + return Executors.newFixedThreadPool(nThreads, tf); } CompletionService createCompletionService(ExecutorService exec) { @@ -230,7 +240,10 @@ protected void closeFileSystems(UserGroupInformation ugi) { protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService cs, UserGroupInformation ugi) throws IOException { + int heartbeatCount = 0; + int downloadThreadCount = getDownloadThreadCount(); while (true) { + heartbeatCount ++; try { LocalizerStatus status = createStatus(); LocalizerHeartbeatResponse response = nodemanager.heartbeat(status); @@ -257,7 +270,13 @@ protected void localizeFiles(LocalizationProtocol nodemanager, } catch (YarnException e) { } return; } - cs.poll(1000, TimeUnit.MILLISECONDS); + if (heartbeatCount < downloadThreadCount) { + // Each heartbeat gives us only 1 resource to download. Don't wait + // for the first 'threadCount' heartbeats to allow parallel download. + // Subsequent downloads are also parallel because cs.poll(...) + // returns early when any download finishes before the timeout. + cs.poll(1000, TimeUnit.MILLISECONDS); + } } catch (InterruptedException e) { return; } catch (YarnException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 54c31c2..65c3f1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1068,10 +1068,8 @@ LocalizerHeartbeatResponse processHeartbeat( List rsrcs = new ArrayList(); - /* - * TODO : It doesn't support multiple downloads per ContainerLocalizer - * at the same time. We need to think whether we should support this. - */ + // Return one resource per heartbeat. + // ContainerLocalizer can run multiple heartbeats to get multiple resources LocalResource next = findNextResource(); if (next != null) { try {