diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 98ec471..7d00d94 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import com.google.common.annotations.VisibleForTesting; + /** * Component tracking resources all of the same {@link LocalResourceVisibility} * @@ -41,4 +44,8 @@ String getUser(); long nextUniqueNumber(); + + @VisibleForTesting + @Private + LocalizedResource getLocalizedResource(LocalResourceRequest request); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 786b58c..dfbeb44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import com.google.common.annotations.VisibleForTesting; + /** * A collection of {@link LocalizedResource}s all of same @@ -307,4 +310,11 @@ public String getUser() { public long nextUniqueNumber() { return uniqueNumberGenerator.incrementAndGet(); } + + @VisibleForTesting + @Private + @Override + public LocalizedResource getLocalizedResource(LocalResourceRequest request) { + return localrsrc.get(request); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index f0cd87b..22304fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -78,19 +78,14 @@ // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) - .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new FetchDirectTransition()) - .addTransition(ResourceState.INIT, ResourceState.INIT, - ResourceEventType.RELEASE, new ReleaseTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, ResourceEventType.LOCALIZED, new FetchSuccessTransition()) - .addTransition(ResourceState.DOWNLOADING, - EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), - ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING, + ResourceEventType.RELEASE, new ReleaseTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) @@ -98,8 +93,6 @@ .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED) - .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -230,14 +223,6 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { } } - private static class FetchDirectTransition extends FetchSuccessTransition { - @Override - public void transition(LocalizedResource rsrc, ResourceEvent event) { - LOG.warn("Resource " + rsrc + " localized without listening container"); - super.transition(rsrc, event); - } - } - /** * Resource localized, notify waiting containers. */ @@ -304,17 +289,4 @@ public void transition(LocalizedResource rsrc, ResourceEvent event) { rsrc.release(relEvent.getContainer()); } } - - private static class ReleasePendingTransition implements - MultipleArcTransition { - @Override - public ResourceState transition(LocalizedResource rsrc, - ResourceEvent event) { - ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; - rsrc.release(relEvent.getContainer()); - return rsrc.ref.isEmpty() - ? ResourceState.INIT - : ResourceState.DOWNLOADING; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a..c852de7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -31,7 +31,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; @@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; @@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ResourceLocalizationService extends CompositeService @@ -492,7 +494,25 @@ private String getUserAppCachePath(String user, String appId) { + Path.SEPARATOR + appId; return path; } + + @VisibleForTesting + @Private + public PublicLocalizer getPublicLocalizer() { + return localizerTracker.publicLocalizer; + } + @VisibleForTesting + @Private + public LocalizerRunner getLocalizerRunner(String locId) { + return localizerTracker.privLocalizers.get(locId); + } + + @VisibleForTesting + @Private + public Map getPrivateLocalizers() { + return localizerTracker.privLocalizers; + } + /** * Sub-component handling the spawning of {@link ContainerLocalizer}s */ @@ -606,41 +626,20 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { final ExecutorService threadPool; final CompletionService queue; final Map,LocalizerResourceRequestEvent> pending; - // TODO hack to work around broken signaling - final Map> attempts; PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>(), - new HashMap>()); + new HashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending, - Map> attempts) { + Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); this.lfs = lfs; this.conf = conf; this.pending = pending; - this.attempts = attempts; -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// for (int i = 0, n = localDirs.size(); i < n; ++i) { -// publicFilecache[i] = -// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString(); -// } -// conf.setStrings(PUBCACHE_CTXT, publicFilecache); - -// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf); -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// int i = 0; -// for (String localDir : localDirs) { -// publicFilecache[i++] = -// new Path(localDir, ContainerLocalizer.FILECACHE).toString(); -// } this.threadPool = threadPool; this.queue = new ExecutorCompletionService(threadPool); @@ -648,36 +647,45 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) { public void addResource(LocalizerResourceRequestEvent request) { // TODO handle failures, cancellation, requests by other containers - LocalResourceRequest key = request.getResource().getRequest(); + LocalizedResource rsrc = request.getResource(); + LocalResourceRequest key = rsrc.getRequest(); LOG.info("Downloading public rsrc:" + key); - synchronized (attempts) { - List sigh = attempts.get(key); - if (null == sigh) { + /* + * Here multiple containers may request the same resource. So we need + * to start downloading only when + * 1) ResourceState == DOWNLOADING + * 2) We are able to acquire non blocking semaphore lock. + * If not we will skip this resource as either it is getting downloaded + * or it FAILED / LOCALIZED. + */ + + if (rsrc.tryAcquire()) { + if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = dirsHandler.getLocalPathForWrite( - "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, - ContainerLocalizer.getEstimatedSize(resource), true); + Path publicDirDestPath = + dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + + ContainerLocalizer.FILECACHE, + ContainerLocalizer.getEstimatedSize(resource), true); Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); + publicRsrc.getPathForLocalization(key, publicDirDestPath); if (!hierarchicalPath.equals(publicDirDestPath)) { publicDirDestPath = hierarchicalPath; - DiskChecker.checkDir( - new File(publicDirDestPath.toUri().getPath())); + DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } publicDirDestPath = new Path(publicDirDestPath, Long.toString(publicRsrc .nextUniqueNumber())); - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource)), - request); - attempts.put(key, new LinkedList()); + pending.put(queue.submit(new FSDownload(lfs, null, conf, + publicDirDestPath, resource)), request); } catch (IOException e) { + rsrc.unlock(); + // TODO Need to Fix IO Exceptions - Notifying resource LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); } } else { - sigh.add(request); + rsrc.unlock(); } } } @@ -700,24 +708,14 @@ public void run() { LocalResourceRequest key = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil .getDU(new File(local.toUri())))); - synchronized (attempts) { - attempts.remove(key); - } + assoc.getResource().unlock(); } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e .getCause())); - synchronized (attempts) { - List reqs; - reqs = attempts.get(req); - if (null == reqs) { - LOG.error("Missing pending list for " + req); - return; - } - attempts.remove(req); - } + assoc.getResource().unlock(); } catch (CancellationException e) { // ignore; shutting down } @@ -776,22 +774,35 @@ private LocalResource findNextResource() { i.hasNext();) { LocalizerResourceRequestEvent evt = i.next(); LocalizedResource nRsrc = evt.getResource(); - if (ResourceState.LOCALIZED.equals(nRsrc.getState())) { + // Resource download should take place ONLY if resource is in + // Downloading state + if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { i.remove(); continue; } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ if (nRsrc.tryAcquire()) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource( - ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } } } return null; @@ -863,6 +874,12 @@ LocalizerHeartbeatResponse update( new ResourceLocalizedEvent(req, ConverterUtils .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); } catch (URISyntaxException e) { } + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + if (pending.isEmpty()) { // TODO: Synchronization response.setLocalizerAction(LocalizerAction.DIE); @@ -889,11 +906,16 @@ LocalizerHeartbeatResponse update( break; case FETCH_FAILURE: LOG.info("DEBUG: FAILED " + req, stat.getException()); - assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle( new ResourceFailedLocalizationEvent(req, stat.getException())); + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + break; default: LOG.info("Unknown status: " + stat.getStatus()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index b2caba0..91c6c5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -129,14 +129,10 @@ public void test() { dispatcher.await(); verifyTrackedResourceCount(tracker, 2); - // Verify resources in state INIT with ref-count=0 is removed. - Assert.assertTrue(tracker.remove(lr2, mockDelService)); - verifyTrackedResourceCount(tracker, 1); - // Verify resource with non zero ref count is not removed. Assert.assertEquals(2, lr1.getRefCount()); Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + verifyTrackedResourceCount(tracker, 2); // Localize resource1 ResourceLocalizedEvent rle = @@ -151,7 +147,7 @@ public void test() { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 0); + verifyTrackedResourceCount(tracker, 1); } finally { if (dispatcher != null) { dispatcher.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index 07d8df1..e42702a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -117,7 +117,7 @@ public void testNotification() throws Exception { local.handle(new ResourceReleaseEvent(rsrcA, container1)); dispatcher.await(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); - assertEquals(ResourceState.INIT, local.getState()); + assertEquals(ResourceState.DOWNLOADING, local.getState()); // Register C2, C3 final ContainerId container2 = getMockContainer(2); @@ -176,24 +176,6 @@ public void testNotification() throws Exception { } } - @Test - public void testDirectLocalization() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(new Configuration()); - try { - dispatcher.start(); - LocalResource apiRsrc = createMockResource(); - LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc); - LocalizedResource local = new LocalizedResource(rsrcA, dispatcher); - Path p = new Path("file:///cache/rsrcA"); - local.handle(new ResourceLocalizedEvent(rsrcA, p, 10)); - dispatcher.await(); - assertEquals(ResourceState.LOCALIZED, local.getState()); - } finally { - dispatcher.stop(); - } - } - static LocalResource createMockResource() { // mock rsrc location org.apache.hadoop.yarn.api.records.URL uriA = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 77bde7b..80ff726 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -34,9 +34,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; @@ -53,6 +53,7 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import junit.framework.Assert; @@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; @@ -677,6 +687,481 @@ public Void answer(InvocationOnMock invocation) throws IOException { } } + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPrivateResource() throws Exception { + + DrainDispatcher dispatcher1 = null; + try { + dispatcher1 = new DrainDispatcher(); + String user = "testuser"; + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher1, exec, delService, + localDirHandler); + dispatcher1.register(LocalizationEventType.class, rls); + rls.init(conf); + + rls.handle(createApplicationLocalizationEvent(user, appId)); + + LocalResourceRequest req = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + + // We need to pre-populate the LocalizerRunner as the + // Resource Localization Service code internally starts them which + // definitely we don't want. + + // creating new containers and populating corresponding localizer runners + + // Container - 1 + ContainerImpl container1 = createMockContainer(user, 1); + String localizerId1 = container1.getContainerID().toString(); + rls.getPrivateLocalizers().put( + localizerId1, + rls.new LocalizerRunner(new LocalizerContext(user, container1 + .getContainerID(), null), localizerId1)); + LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1); + + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container1, + LocalResourceVisibility.PRIVATE, req)); + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200)); + + // Container - 2 now makes the request. + ContainerImpl container2 = createMockContainer(user, 2); + String localizerId2 = container2.getContainerID().toString(); + rls.getPrivateLocalizers().put( + localizerId2, + rls.new LocalizerRunner(new LocalizerContext(user, container2 + .getContainerID(), null), localizerId2)); + LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container2, + LocalResourceVisibility.PRIVATE, req)); + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200)); + + // Retrieving localized resource. + LocalResourcesTracker tracker = + rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user, + appId); + LocalizedResource lr = tracker.getLocalizedResource(req); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + // Resource should have one permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // Resource Localization Service receives first heart beat from + // ContainerLocalizer for container1 + LocalizerHeartbeatResponse response1 = + rls.heartbeat(createLocalizerStatus(localizerId1)); + + // Resource must have been added to scheduled map + Assert.assertEquals(1, localizerRunner1.scheduled.size()); + // Checking resource in the response and also available permits for it. + Assert.assertEquals(req.getResource(), response1.getResourceSpecs() + .get(0).getResource().getResource()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Resource Localization Service now receives first heart beat from + // ContainerLocalizer for container2 + LocalizerHeartbeatResponse response2 = + rls.heartbeat(createLocalizerStatus(localizerId2)); + + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + // No resource is returned in response + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + // ContainerLocalizer - 1 now sends failed resource heartbeat. + rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req)); + + // Resource Localization should fail and state is modified accordingly. + // Also Local should be release on the LocalizedResource. + Assert + .assertTrue(waitForResourceState(lr, rls, req, + LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED, + 200)); + Assert.assertTrue(lr.getState().equals(ResourceState.FAILED)); + Assert.assertEquals(0, localizerRunner1.scheduled.size()); + + // Now Container-2 once again sends heart beat to resource localization + // service + + // Now container-2 again try to download the resource it should still + // not get the resource as the resource is now not in DOWNLOADING state. + response2 = rls.heartbeat(createLocalizerStatus(localizerId2)); + + // Resource must not have been added to scheduled map. + // Also as the resource has failed download it will be removed from + // pending list. + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, localizerRunner2.pending.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + } + + private LocalizerStatus createLocalizerStatusForFailedResource( + String localizerId, LocalResourceRequest req) { + LocalizerStatus status = createLocalizerStatus(localizerId); + LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl(); + resourceStatus.setException(new YarnRemoteExceptionPBImpl("test")); + resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE); + resourceStatus.setResource(req); + status.addResourceStatus(resourceStatus); + return status; + } + + private LocalizerStatus createLocalizerStatus(String localizerId1) { + LocalizerStatus status = new LocalizerStatusPBImpl(); + status.setLocalizerId(localizerId1); + return status; + } + + private LocalizationEvent createApplicationLocalizationEvent(String user, + ApplicationId appId) { + Application app = mock(Application.class); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + return new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app); + } + + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPublicResource() throws Exception { + + DrainDispatcher dispatcher1 = null; + String user = "testuser"; + try { + // Setting up ResourceLocalization service. + Configuration conf = new Configuration(); + dispatcher1 = new DrainDispatcher(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + // Creating and initializing ResourceLocalizationService but not starting + // it as otherwise it will remove requests from pending queue. + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher1, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + dispatcher1.register(LocalizationEventType.class, spyService); + spyService.init(conf); + + // Initially pending map should be empty for public localizer + Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size()); + + LocalResourceRequest req = + new LocalResourceRequest(new Path("/tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + // Initializing application + ApplicationImpl app = mock(ApplicationImpl.class); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + when(app.getAppId()).thenReturn(appId); + when(app.getUser()).thenReturn(user); + dispatcher1.getEventHandler().handle( + new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + + // Container - 1 + + // container requesting the resource + ContainerImpl container1 = createMockContainer(user, 1); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container1, + LocalResourceVisibility.PUBLIC, req)); + + // Waiting for resource to change into DOWNLOADING state. + Assert.assertTrue(waitForResourceState(null, spyService, req, + LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING, + 200)); + + // Waiting for download to start. + Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200)); + + LocalizedResource lr = + getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC, + user, null); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + + // pending should have this resource now. + Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size()); + // Now resource should have 0 permit. + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Container - 2 + + // Container requesting the same resource. + ContainerImpl container2 = createMockContainer(user, 2); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container2, + LocalResourceVisibility.PUBLIC, req)); + + // Waiting for download to start. This should return false as new download + // will not start + Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100)); + + // Now Failing the resource download. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req, new Exception("test")); + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user, + null).handle(locFailedEvent); + + // Waiting for resource to change into FAILED state. + Assert.assertTrue(waitForResourceState(lr, spyService, req, + LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200)); + // releasing lock as a part of download failed process. + lr.unlock(); + // removing pending download request. + spyService.getPublicLocalizer().pending.clear(); + + // Now I need to simulate a race condition wherein Event is added to + // dispatcher before resource state changes to either FAILED or LOCALIZED + // Hence sending event directly to dispatcher. + LocalizerResourceRequestEvent localizerEvent = + new LocalizerResourceRequestEvent(lr, null, + mock(LocalizerContext.class), null); + + dispatcher1.getEventHandler().handle(localizerEvent); + // Waiting for download to start. This should return false as new download + // will not start + Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100)); + // Checking available permits now. + Assert.assertEquals(1, lr.sem.availablePermits()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + + } + + private boolean waitForPrivateDownloadToStart( + ResourceLocalizationService service, String localizerId, int size, + int maxWaitTime) { + List pending = null; + // Waiting for localizer to be created. + do { + if (service.getPrivateLocalizers().get(localizerId) != null) { + pending = service.getPrivateLocalizers().get(localizerId).pending; + } + if (pending == null) { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + if (pending == null) { + return false; + } + do { + if (pending.size() == size) { + return true; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } + } while (maxWaitTime > 0); + return pending.size() == size; + } + + private boolean waitForPublicDownloadToStart( + ResourceLocalizationService service, int size, int maxWaitTime) { + Map, LocalizerResourceRequestEvent> pending = null; + // Waiting for localizer to be created. + do { + if (service.getPublicLocalizer() != null) { + pending = service.getPublicLocalizer().pending; + } + if (pending == null) { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + if (pending == null) { + return false; + } + do { + if (pending.size() == size) { + return true; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (InterruptedException e) { + } + } + } while (maxWaitTime > 0); + return pending.size() == size; + + } + + private LocalizedResource getLocalizedResource( + ResourceLocalizationService service, LocalResourceRequest req, + LocalResourceVisibility vis, String user, ApplicationId appId) { + return service.getLocalResourcesTracker(vis, user, appId) + .getLocalizedResource(req); + } + + private boolean waitForResourceState(LocalizedResource lr, + ResourceLocalizationService service, LocalResourceRequest req, + LocalResourceVisibility vis, String user, ApplicationId appId, + ResourceState resourceState, long maxWaitTime) { + LocalResourcesTracker tracker = null; + // checking tracker is created + do { + if (tracker == null) { + tracker = service.getLocalResourcesTracker(vis, user, appId); + } + if (tracker != null && lr == null) { + lr = tracker.getLocalizedResource(req); + } + if (lr != null) { + break; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (InterruptedException e) { + } + } + } while (maxWaitTime > 0); + // this will wait till resource state is changed to (resourceState). + if (lr == null) { + return false; + } + do { + if (!lr.getState().equals(resourceState)) { + try { + maxWaitTime -= 50; + Thread.sleep(50); + } catch (InterruptedException e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + return lr.getState().equals(resourceState); + } + + private ContainerLocalizationRequestEvent createContainerLocalizationEvent( + ContainerImpl container, LocalResourceVisibility vis, + LocalResourceRequest req) { + Map> reqs = + new HashMap>(); + List resourceList = + new ArrayList(); + resourceList.add(req); + reqs.put(vis, resourceList); + return new ContainerLocalizationRequestEvent(container, reqs); + } + + private ContainerImpl createMockContainer(String user, int containerId) { + ContainerImpl container = mock(ContainerImpl.class); + when(container.getContainerID()).thenReturn( + BuilderUtils.newContainerId(1, 1, 1, containerId)); + when(container.getUser()).thenReturn(user); + Credentials mockCredentials = mock(Credentials.class); + when(container.getCredentials()).thenReturn(mockCredentials); + return container; + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url;