diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3f84a23..fe44045 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2752,6 +2752,11 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + /** Number of threads to use for private localization fetching. */ + public static final String NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + + "localizer.private.fetch.thread-count"; + public static final int DEFAULT_NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT = 4; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 04be631..04d3e32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -34,7 +34,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -192,10 +194,19 @@ public LocalizationProtocol run() { } } } + + int getDownloadThreadCount() { + return conf.getInt( + YarnConfiguration.NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_PRIVATE_LOCALIZER_FETCH_THREAD_COUNT); + } ExecutorService createDownloadThreadPool() { - return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("ContainerLocalizer Downloader").build()); + int nThreads = getDownloadThreadCount(); + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("ContainerLocalizer Downloader #%d") + .build(); + return Executors.newFixedThreadPool(nThreads, tf); } CompletionService createCompletionService(ExecutorService exec) { @@ -237,7 +248,10 @@ protected void closeFileSystems(UserGroupInformation ugi) { protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService cs, UserGroupInformation ugi) throws IOException, YarnException { + int heartbeatCount = 0; + int downloadThreadCount = getDownloadThreadCount(); while (true) { + heartbeatCount ++; try { LocalizerStatus status = createStatus(); LocalizerHeartbeatResponse response = nodemanager.heartbeat(status); @@ -269,7 +283,13 @@ protected void localizeFiles(LocalizationProtocol nodemanager, } return; } - cs.poll(1000, TimeUnit.MILLISECONDS); + if (heartbeatCount < downloadThreadCount) { + // Each heartbeat gives us only 1 resource to download. Don't wait + // for the first 'threadCount' heartbeats to allow parallel download. + // Subsequent downloads are also parallel because cs.poll(...) + // returns early when any download finishes before the timeout. + cs.poll(1000, TimeUnit.MILLISECONDS); + } } catch (InterruptedException e) { return; } catch (YarnException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4cd1acc..2b0a3b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1107,10 +1107,9 @@ LocalizerHeartbeatResponse processHeartbeat( List rsrcs = new ArrayList(); - /* - * TODO : It doesn't support multiple downloads per ContainerLocalizer - * at the same time. We need to think whether we should support this. - */ + // Return one resource per heartbeat. + // ContainerLocalizer can run multiple heartbeats to get multiple + // resources LocalResource next = findNextResource(); if (next != null) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index fac7086..a5d90bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -200,6 +200,112 @@ public boolean matches(Object o) { })); } + @Test + public void testMultipleThreadDownload() throws Exception { + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = setupContainerLocalizerForTest(); + + // verify created cache + List privCacheList = new ArrayList(); + List appCacheList = new ArrayList(); + for (Path p : localDirs) { + Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); + Path privcache = new Path(base, ContainerLocalizer.FILECACHE); + privCacheList.add(privcache); + Path appDir = new Path(base, + new Path(ContainerLocalizer.APPCACHE, appId)); + Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE); + appCacheList.add(appcache); + } + + // mock heartbeat responses from NM + ResourceLocalizationSpec rsrcA = getMockRsrc(random, + LocalResourceVisibility.PRIVATE, privCacheList.get(0)); + ResourceLocalizationSpec rsrcB = getMockRsrc(random, + LocalResourceVisibility.PRIVATE, privCacheList.get(0)); + ResourceLocalizationSpec rsrcC = getMockRsrc(random, + LocalResourceVisibility.APPLICATION, appCacheList.get(0)); + ResourceLocalizationSpec rsrcD = getMockRsrc(random, + LocalResourceVisibility.PRIVATE, privCacheList.get(0)); + ResourceLocalizationSpec rsrcE = getMockRsrc(random, + LocalResourceVisibility.APPLICATION, appCacheList.get(0)); + ResourceLocalizationSpec rsrcF = getMockRsrc(random, + LocalResourceVisibility.PRIVATE, privCacheList.get(0)); + + when(nmProxy.heartbeat(isA(LocalizerStatus.class))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcA))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcB))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcC))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcD))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcE))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections.singletonList(rsrcF))) + .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, + Collections. emptyList())) + .thenReturn( + new MockLocalizerHeartbeatResponse(LocalizerAction.DIE, null)); + + LocalResource tRsrcA = rsrcA.getResource(); + LocalResource tRsrcB = rsrcB.getResource(); + LocalResource tRsrcC = rsrcC.getResource(); + LocalResource tRsrcD = rsrcD.getResource(); + LocalResource tRsrcE = rsrcE.getResource(); + LocalResource tRsrcF = rsrcF.getResource(); + FakeLargeDownload download1 = new FakeLargeDownload( + rsrcA.getResource().getResource().getFile(), true); + FakeLargeDownload download2 = new FakeLargeDownload( + rsrcB.getResource().getResource().getFile(), true); + FakeLargeDownload download3 = new FakeLargeDownload( + rsrcC.getResource().getResource().getFile(), true); + FakeLargeDownload download4 = new FakeLargeDownload( + rsrcD.getResource().getResource().getFile(), true); + FakeLargeDownload download5 = new FakeLargeDownload( + rsrcE.getResource().getResource().getFile(), true); + FakeLargeDownload download6 = new FakeLargeDownload( + rsrcF.getResource().getResource().getFile(), true); + doReturn(download1).when(localizer).download(isA(Path.class), eq(tRsrcA), + isA(UserGroupInformation.class)); + doReturn(download2).when(localizer).download(isA(Path.class), eq(tRsrcB), + isA(UserGroupInformation.class)); + doReturn(download3).when(localizer).download(isA(Path.class), eq(tRsrcC), + isA(UserGroupInformation.class)); + doReturn(download4).when(localizer).download(isA(Path.class), eq(tRsrcD), + isA(UserGroupInformation.class)); + doReturn(download5).when(localizer).download(isA(Path.class), eq(tRsrcE), + isA(UserGroupInformation.class)); + doReturn(download6).when(localizer).download(isA(Path.class), eq(tRsrcF), + isA(UserGroupInformation.class)); + + // run localization + localizer.runLocalization(nmAddr); + for (Path p : localDirs) { + Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); + Path privcache = new Path(base, ContainerLocalizer.FILECACHE); + // $x/usercache/$user/filecache + verify(spylfs).mkdir(eq(privcache), eq(CACHE_DIR_PERM), eq(false)); + Path appDir = new Path(base, + new Path(ContainerLocalizer.APPCACHE, appId)); + // $x/usercache/$user/appcache/$appId/filecache + Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE); + verify(spylfs).mkdir(eq(appcache), eq(CACHE_DIR_PERM), eq(false)); + } + // verify tokens read at expected location + verify(spylfs).open(tokenPath); + + Assert.assertTrue(download1.called); + Assert.assertTrue(download2.called); + Assert.assertTrue(download3.called); + Assert.assertTrue(download4.called); + Assert.assertFalse(download5.called); + Assert.assertFalse(download6.called); + } + @Test(timeout = 15000) public void testMainFailure() throws Exception { @@ -300,25 +406,6 @@ private ContainerLocalizer setupContainerLocalizerForTest() doReturn(nmProxy).when(localizer).getProxy(nmAddr); doNothing().when(localizer).sleep(anyInt()); - - // return result instantly for deterministic test - ExecutorService syncExec = mock(ExecutorService.class); - CompletionService cs = mock(CompletionService.class); - when(cs.submit(isA(Callable.class))) - .thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invoc) - throws Throwable { - Future done = mock(Future.class); - when(done.isDone()).thenReturn(true); - FakeDownload d = (FakeDownload) invoc.getArguments()[0]; - when(done.get()).thenReturn(d.call()); - return done; - } - }); - doReturn(syncExec).when(localizer).createDownloadThreadPool(); - doReturn(cs).when(localizer).createCompletionService(syncExec); - return localizer; } @@ -348,8 +435,8 @@ public boolean matches(Object o) { } static class FakeDownload implements Callable { - private final Path localPath; - private final boolean succeed; + final Path localPath; + final boolean succeed; FakeDownload(String absPath, boolean succeed) { this.localPath = new Path("file:///localcache" + absPath); this.succeed = succeed; @@ -362,6 +449,25 @@ public Path call() throws IOException { return localPath; } } + + static class FakeLargeDownload extends FakeDownload implements Callable { + public boolean called; + FakeLargeDownload(String absPath, boolean succeed) { + super(absPath, succeed); + } + @Override + public Path call() throws IOException { + called = true; + if (!succeed) { + throw new IOException("FAIL " + localPath); + } + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } + return localPath; + } + } static RecordFactory getMockLocalizerRecordFactory() { RecordFactory mockRF = mock(RecordFactory.class);