diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index b2caba0..91c6c5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -129,14 +129,10 @@ public void test() { dispatcher.await(); verifyTrackedResourceCount(tracker, 2); - // Verify resources in state INIT with ref-count=0 is removed. - Assert.assertTrue(tracker.remove(lr2, mockDelService)); - verifyTrackedResourceCount(tracker, 1); - // Verify resource with non zero ref count is not removed. Assert.assertEquals(2, lr1.getRefCount()); Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + verifyTrackedResourceCount(tracker, 2); // Localize resource1 ResourceLocalizedEvent rle = @@ -151,7 +147,7 @@ public void test() { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 0); + verifyTrackedResourceCount(tracker, 1); } finally { if (dispatcher != null) { dispatcher.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index 730f74a..e42702a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -117,7 +117,7 @@ public void testNotification() throws Exception { local.handle(new ResourceReleaseEvent(rsrcA, container1)); dispatcher.await(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); - assertEquals(ResourceState.INIT, local.getState()); + assertEquals(ResourceState.DOWNLOADING, local.getState()); // Register C2, C3 final ContainerId container2 = getMockContainer(2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 77bde7b..2b3888b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -34,9 +34,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; @@ -53,6 +53,8 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import junit.framework.Assert; @@ -97,13 +99,18 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; @@ -677,6 +684,304 @@ public Void answer(InvocationOnMock invocation) throws IOException { } } + @Test(timeout = 100000) + public void testParallelDownloadAttemptsForPrivateResource() throws Exception { + + DrainDispatcher dispatcher1 = null, dispatcher2 = null; + try { + dispatcher1 = new DrainDispatcher(); + dispatcher2 = new DrainDispatcher(); + + // Creating a mocked localizer context. Not providing container-id as it + // is not checked in findNextResource call. + LocalizerContext cxt = + new LocalizerContext("testuser", BuilderUtils.newContainerId(1, 1, 1, + 1), null); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher2, + mock(ContainerExecutor.class), mock(DeletionService.class), + localDirHandler); + rls.init(conf); + + Application app = mock(Application.class); + when(app.getUser()).thenReturn("testuser"); + when(app.getAppId()).thenReturn(BuilderUtils.newApplicationId(1, 1)); + ApplicationLocalizationEvent appLocalizationEvent = + new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app); + rls.handle(appLocalizationEvent); + // Creating localizer runner per container + LocalizerRunner localizerRunner1 = + rls.new LocalizerRunner(cxt, "container-1"); + LocalizerRunner localizerRunner2 = + rls.new LocalizerRunner(cxt, "container-2"); + + // Initially pending list and scheduled map should be empty. + Assert.assertEquals(0, localizerRunner1.pending.size()); + Assert.assertEquals(0, localizerRunner1.scheduled.size()); + Assert.assertEquals(0, localizerRunner2.pending.size()); + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req1 = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + LocalizedResource lr = new LocalizedResource(req1, dispatcher1); + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, cxt); + lr.handle(reqEvent); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + // Resource should have one permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // adding resource request event to pending list of container-1 + LocalizerResourceRequestEvent localizerEvent1 = + new LocalizerResourceRequestEvent(lr, + LocalResourceVisibility.PRIVATE, cxt, null); + localizerRunner1.pending.add(localizerEvent1); + + // adding resource request event to pending list of container-2 + LocalizerResourceRequestEvent localizerEvent2 = + new LocalizerResourceRequestEvent(lr, + LocalResourceVisibility.PRIVATE, cxt, null); + localizerRunner2.pending.add(localizerEvent2); + + // Container-1 will try to retrieve resource for localization. It should + // return resource as the resource is in DOWNLOADING state and available + // permit = 1 + + List statusList = + new ArrayList(); + LocalizerHeartbeatResponse response1 = + localizerRunner1.update(statusList); + + // Resource must have been added to scheduled map + Assert.assertEquals(1, localizerRunner1.scheduled.size()); + // Checking resource in the response and also available permits for it. + Assert.assertEquals(req1.getResource(), + response1.getResourceSpecs().get(0).getResource().getResource()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Container-2 will try to retreive resource for localization. It should + // not return the resource as even though resource is in DOWNLOADING state + // still lock can not be acquired again. + LocalizerHeartbeatResponse response2 = + localizerRunner2.update(statusList); + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + // Resource Localization is failed. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req1, new Exception("test")); + // verifying state before event + Assert.assertTrue(lr.getState().equals(ResourceState.DOWNLOADING)); + + lr.handle(locFailedEvent); + + // verifing state change after event + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + // releasing lock as a part of failed download. + lr.unlock(); + + // Now container-2 again try to download the resource it should still + // not get the resource as the resource is now not in DOWNLOADING state. + response2 = localizerRunner2.update(statusList); + + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + if (dispatcher2 != null) { + dispatcher2.stop(); + } + } + } + + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPublicResource() throws Exception { + + DrainDispatcher dispatcher1 = null, dispatcher2 = null; + try { + // Setting up ResourceLocalization service. + Configuration conf = new Configuration(); + dispatcher1 = new DrainDispatcher(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + // Creating and initializing ResourceLocalizationService but not starting + // it as otherwise it will remove requests from pending queue. + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher1, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + + spyService.init(conf); + + // We need to track this map for all the requests which are populated into + // it. + Map, LocalizerResourceRequestEvent> pending = + new HashMap, LocalizerResourceRequestEvent>(); + PublicLocalizer publicLocalizer = + spyService.new PublicLocalizer(conf, + FileContext.getFileContext(conf), mock(ExecutorService.class), + pending); + + // Creating a mocked localizer context. Not providing container-id as it + // is not checked in addResource call. + LocalizerContext cxt = + new LocalizerContext("testuser", mock(ContainerId.class), null); + + // Initially pending map should be empty + Assert.assertEquals(0, pending.size()); + + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req1 = + new LocalResourceRequest(new Path("/tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + dispatcher2 = new DrainDispatcher(); + LocalizedResource lr = new LocalizedResource(req1, dispatcher2); + + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, cxt); + lr.handle(reqEvent); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + + // Initially resource should have 1 permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // First resource Localization request is made + LocalizerResourceRequestEvent localizerEvent1 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent1); + + // pending should have this resource now. + Assert.assertEquals(1, pending.size()); + // Now resource should have 0 permit. + Assert.assertEquals(0, lr.sem.availablePermits()); + + // One more request is made before resource localization is either + // completed / Failed. This should not start new download as download is + // already in progress. + LocalizerResourceRequestEvent localizerEvent2 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent2); + + // pending should not have one added request. + Assert.assertEquals(1, pending.size()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Resource Localization is successful/ failed. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req1, new Exception("test")); + // verifying state before event + Assert.assertTrue(lr.getState().equals(ResourceState.DOWNLOADING)); + + lr.handle(locFailedEvent); + + // verifing state change after event + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + // releasing lock as a part of failed download. + lr.unlock(); + + // One more request is made after resource localization is either + // completed / Failed. This should not start new download as resource + // state + // has changed even though lock is available. + LocalizerResourceRequestEvent localizerEvent3 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent3); + + // pending should not have one added request. + Assert.assertEquals(1, pending.size()); + // acquiring a lock should succeed + Assert.assertEquals(1, lr.sem.availablePermits()); + // verifying resource is not in DOWNLOADING state + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + if (dispatcher2 != null) { + dispatcher2.stop(); + } + } + + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url;