diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index f0cd87b..22304fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -78,19 +78,14 @@ // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) - .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new FetchDirectTransition()) - .addTransition(ResourceState.INIT, ResourceState.INIT, - ResourceEventType.RELEASE, new ReleaseTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, ResourceEventType.LOCALIZED, new FetchSuccessTransition()) - .addTransition(ResourceState.DOWNLOADING, - EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), - ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING, + ResourceEventType.RELEASE, new ReleaseTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) @@ -98,8 +93,6 @@ .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED) - .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -230,14 +223,6 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { } } - private static class FetchDirectTransition extends FetchSuccessTransition { - @Override - public void transition(LocalizedResource rsrc, ResourceEvent event) { - LOG.warn("Resource " + rsrc + " localized without listening container"); - super.transition(rsrc, event); - } - } - /** * Resource localized, notify waiting containers. */ @@ -304,17 +289,4 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.release(relEvent.getContainer()); } } - - private static class ReleasePendingTransition implements - MultipleArcTransition { - @Override - public ResourceState transition(LocalizedResource rsrc, - ResourceEvent event) { - ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; - rsrc.release(relEvent.getContainer()); - return rsrc.ref.isEmpty() - ? ResourceState.INIT - : ResourceState.DOWNLOADING; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a..2bb7404 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -31,7 +31,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; @@ -606,41 +605,20 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { final ExecutorService threadPool; final CompletionService queue; final Map,LocalizerResourceRequestEvent> pending; - // TODO hack to work around broken signaling - final Map> attempts; PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>(), - new HashMap>()); + new HashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending, - Map> attempts) { + Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); this.lfs = lfs; this.conf = conf; this.pending = pending; - this.attempts = attempts; -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// for (int i = 0, n = localDirs.size(); i < n; ++i) { -// publicFilecache[i] = -// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString(); -// } -// conf.setStrings(PUBCACHE_CTXT, publicFilecache); - -// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf); -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// int i = 0; -// for (String localDir : localDirs) { -// publicFilecache[i++] = -// new Path(localDir, ContainerLocalizer.FILECACHE).toString(); -// } this.threadPool = threadPool; this.queue = new ExecutorCompletionService(threadPool); @@ -648,36 +626,45 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { public void addResource(LocalizerResourceRequestEvent request) { // TODO handle failures, cancellation, requests by other containers - LocalResourceRequest key = request.getResource().getRequest(); + LocalizedResource rsrc = request.getResource(); + LocalResourceRequest key = rsrc.getRequest(); LOG.info("Downloading public rsrc:" + key); - synchronized (attempts) { - List sigh = attempts.get(key); - if (null == sigh) { + /* + * Here multiple containers may request the same resource. So we need + * to start downloading only when + * 1) ResourceState == DOWNLOADING + * 2) We are able to acquire non blocking semaphore lock. + * If not we will skip this resource as either it is getting downloaded + * or it FAILED / LOCALIZED. + */ + + if (rsrc.tryAcquire()) { + if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = dirsHandler.getLocalPathForWrite( - "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, - ContainerLocalizer.getEstimatedSize(resource), true); + Path publicDirDestPath = + dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + + ContainerLocalizer.FILECACHE, + ContainerLocalizer.getEstimatedSize(resource), true); Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); + publicRsrc.getPathForLocalization(key, publicDirDestPath); if (!hierarchicalPath.equals(publicDirDestPath)) { publicDirDestPath = hierarchicalPath; - DiskChecker.checkDir( - new File(publicDirDestPath.toUri().getPath())); + DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } publicDirDestPath = new Path(publicDirDestPath, Long.toString(publicRsrc .nextUniqueNumber())); - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource)), - request); - attempts.put(key, new LinkedList()); + pending.put(queue.submit(new FSDownload(lfs, null, conf, + publicDirDestPath, resource)), request); } catch (IOException e) { + rsrc.unlock(); + // TODO Need to Fix IO Exceptions - Notifying resource LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); } } else { - sigh.add(request); + rsrc.unlock(); } } } @@ -700,24 +687,14 @@ public void run() { LocalResourceRequest key = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil .getDU(new File(local.toUri())))); - synchronized (attempts) { - attempts.remove(key); - } + assoc.getResource().unlock(); } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e .getCause())); - synchronized (attempts) { - List reqs; - reqs = attempts.get(req); - if (null == reqs) { - LOG.error("Missing pending list for " + req); - return; - } - attempts.remove(req); - } + assoc.getResource().unlock(); } catch (CancellationException e) { // ignore; shutting down } @@ -780,18 +757,29 @@ private LocalResource findNextResource() { i.remove(); continue; } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ if (nRsrc.tryAcquire()) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource( - ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } } } return null; @@ -863,6 +851,12 @@ LocalizerHeartbeatResponse update( new ResourceLocalizedEvent(req, ConverterUtils .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); } catch (URISyntaxException e) { } + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + if (pending.isEmpty()) { // TODO: Synchronization response.setLocalizerAction(LocalizerAction.DIE); @@ -889,11 +883,16 @@ LocalizerHeartbeatResponse update( break; case FETCH_FAILURE: LOG.info("DEBUG: FAILED " + req, stat.getException()); - assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle( new ResourceFailedLocalizationEvent(req, stat.getException())); + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + break; default: LOG.info("Unknown status: " + stat.getStatus()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index b2caba0..91c6c5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -129,14 +129,10 @@ public void test() { dispatcher.await(); verifyTrackedResourceCount(tracker, 2); - // Verify resources in state INIT with ref-count=0 is removed. - Assert.assertTrue(tracker.remove(lr2, mockDelService)); - verifyTrackedResourceCount(tracker, 1); - // Verify resource with non zero ref count is not removed. Assert.assertEquals(2, lr1.getRefCount()); Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + verifyTrackedResourceCount(tracker, 2); // Localize resource1 ResourceLocalizedEvent rle = @@ -151,7 +147,7 @@ public void test() { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 0); + verifyTrackedResourceCount(tracker, 1); } finally { if (dispatcher != null) { dispatcher.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index 07d8df1..e42702a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -117,7 +117,7 @@ public void testNotification() throws Exception { local.handle(new ResourceReleaseEvent(rsrcA, container1)); dispatcher.await(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); - assertEquals(ResourceState.INIT, local.getState()); + assertEquals(ResourceState.DOWNLOADING, local.getState()); // Register C2, C3 final ContainerId container2 = getMockContainer(2); @@ -176,24 +176,6 @@ public void testNotification() throws Exception { } } - @Test - public void testDirectLocalization() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(new Configuration()); - try { - dispatcher.start(); - LocalResource apiRsrc = createMockResource(); - LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc); - LocalizedResource local = new LocalizedResource(rsrcA, dispatcher); - Path p = new Path("file:///cache/rsrcA"); - local.handle(new ResourceLocalizedEvent(rsrcA, p, 10)); - dispatcher.await(); - assertEquals(ResourceState.LOCALIZED, local.getState()); - } finally { - dispatcher.stop(); - } - } - static LocalResource createMockResource() { // mock rsrc location org.apache.hadoop.yarn.api.records.URL uriA = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 77bde7b..2b3888b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -34,9 +34,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; @@ -53,6 +53,8 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import junit.framework.Assert; @@ -97,13 +99,18 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; @@ -677,6 +684,304 @@ public Void answer(InvocationOnMock invocation) throws IOException { } } + @Test(timeout = 100000) + public void testParallelDownloadAttemptsForPrivateResource() throws Exception { + + DrainDispatcher dispatcher1 = null, dispatcher2 = null; + try { + dispatcher1 = new DrainDispatcher(); + dispatcher2 = new DrainDispatcher(); + + // Creating a mocked localizer context. Not providing container-id as it + // is not checked in findNextResource call. + LocalizerContext cxt = + new LocalizerContext("testuser", BuilderUtils.newContainerId(1, 1, 1, + 1), null); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher2, + mock(ContainerExecutor.class), mock(DeletionService.class), + localDirHandler); + rls.init(conf); + + Application app = mock(Application.class); + when(app.getUser()).thenReturn("testuser"); + when(app.getAppId()).thenReturn(BuilderUtils.newApplicationId(1, 1)); + ApplicationLocalizationEvent appLocalizationEvent = + new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app); + rls.handle(appLocalizationEvent); + // Creating localizer runner per container + LocalizerRunner localizerRunner1 = + rls.new LocalizerRunner(cxt, "container-1"); + LocalizerRunner localizerRunner2 = + rls.new LocalizerRunner(cxt, "container-2"); + + // Initially pending list and scheduled map should be empty. + Assert.assertEquals(0, localizerRunner1.pending.size()); + Assert.assertEquals(0, localizerRunner1.scheduled.size()); + Assert.assertEquals(0, localizerRunner2.pending.size()); + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req1 = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + LocalizedResource lr = new LocalizedResource(req1, dispatcher1); + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, cxt); + lr.handle(reqEvent); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + // Resource should have one permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // adding resource request event to pending list of container-1 + LocalizerResourceRequestEvent localizerEvent1 = + new LocalizerResourceRequestEvent(lr, + LocalResourceVisibility.PRIVATE, cxt, null); + localizerRunner1.pending.add(localizerEvent1); + + // adding resource request event to pending list of container-2 + LocalizerResourceRequestEvent localizerEvent2 = + new LocalizerResourceRequestEvent(lr, + LocalResourceVisibility.PRIVATE, cxt, null); + localizerRunner2.pending.add(localizerEvent2); + + // Container-1 will try to retrieve resource for localization. It should + // return resource as the resource is in DOWNLOADING state and available + // permit = 1 + + List statusList = + new ArrayList(); + LocalizerHeartbeatResponse response1 = + localizerRunner1.update(statusList); + + // Resource must have been added to scheduled map + Assert.assertEquals(1, localizerRunner1.scheduled.size()); + // Checking resource in the response and also available permits for it. + Assert.assertEquals(req1.getResource(), + response1.getResourceSpecs().get(0).getResource().getResource()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Container-2 will try to retreive resource for localization. It should + // not return the resource as even though resource is in DOWNLOADING state + // still lock can not be acquired again. + LocalizerHeartbeatResponse response2 = + localizerRunner2.update(statusList); + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + // Resource Localization is failed. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req1, new Exception("test")); + // verifying state before event + Assert.assertTrue(lr.getState().equals(ResourceState.DOWNLOADING)); + + lr.handle(locFailedEvent); + + // verifing state change after event + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + // releasing lock as a part of failed download. + lr.unlock(); + + // Now container-2 again try to download the resource it should still + // not get the resource as the resource is now not in DOWNLOADING state. + response2 = localizerRunner2.update(statusList); + + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + if (dispatcher2 != null) { + dispatcher2.stop(); + } + } + } + + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPublicResource() throws Exception { + + DrainDispatcher dispatcher1 = null, dispatcher2 = null; + try { + // Setting up ResourceLocalization service. + Configuration conf = new Configuration(); + dispatcher1 = new DrainDispatcher(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + // Creating and initializing ResourceLocalizationService but not starting + // it as otherwise it will remove requests from pending queue. + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher1, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + + spyService.init(conf); + + // We need to track this map for all the requests which are populated into + // it. + Map, LocalizerResourceRequestEvent> pending = + new HashMap, LocalizerResourceRequestEvent>(); + PublicLocalizer publicLocalizer = + spyService.new PublicLocalizer(conf, + FileContext.getFileContext(conf), mock(ExecutorService.class), + pending); + + // Creating a mocked localizer context. Not providing container-id as it + // is not checked in addResource call. + LocalizerContext cxt = + new LocalizerContext("testuser", mock(ContainerId.class), null); + + // Initially pending map should be empty + Assert.assertEquals(0, pending.size()); + + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req1 = + new LocalResourceRequest(new Path("/tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + dispatcher2 = new DrainDispatcher(); + LocalizedResource lr = new LocalizedResource(req1, dispatcher2); + + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, cxt); + lr.handle(reqEvent); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + + // Initially resource should have 1 permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // First resource Localization request is made + LocalizerResourceRequestEvent localizerEvent1 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent1); + + // pending should have this resource now. + Assert.assertEquals(1, pending.size()); + // Now resource should have 0 permit. + Assert.assertEquals(0, lr.sem.availablePermits()); + + // One more request is made before resource localization is either + // completed / Failed. This should not start new download as download is + // already in progress. + LocalizerResourceRequestEvent localizerEvent2 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent2); + + // pending should not have one added request. + Assert.assertEquals(1, pending.size()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Resource Localization is successful/ failed. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req1, new Exception("test")); + // verifying state before event + Assert.assertTrue(lr.getState().equals(ResourceState.DOWNLOADING)); + + lr.handle(locFailedEvent); + + // verifing state change after event + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + // releasing lock as a part of failed download. + lr.unlock(); + + // One more request is made after resource localization is either + // completed / Failed. This should not start new download as resource + // state + // has changed even though lock is available. + LocalizerResourceRequestEvent localizerEvent3 = + new LocalizerResourceRequestEvent(lr, null, cxt, null); + publicLocalizer.addResource(localizerEvent3); + + // pending should not have one added request. + Assert.assertEquals(1, pending.size()); + // acquiring a lock should succeed + Assert.assertEquals(1, lr.sem.availablePermits()); + // verifying resource is not in DOWNLOADING state + Assert.assertFalse(lr.getState().equals(ResourceState.DOWNLOADING)); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + if (dispatcher2 != null) { + dispatcher2.stop(); + } + } + + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url;