diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 97c68aa..f6f2ad3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -686,6 +687,12 @@ public void addResource(LocalizerResourceRequestEvent request) { // TODO Need to Fix IO Exceptions - Notifying resource LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); + } catch (RejectedExecutionException re) { + rsrc.unlock(); + publicRsrc.handle(new ResourceFailedLocalizationEvent(request + .getResource().getRequest(), re.getMessage())); + LOG.error("Failed to submit rsrc " + rsrc + " for download." + + " Either queue is full or threadpool is shutdown.", re); } } else { rsrc.unlock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 8b7c91a..01dcb80 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; @@ -681,6 +683,99 @@ public Void answer(InvocationOnMock invocation) throws IOException { dispatcher.stop(); } } + + /* + * Test case for handling RejectedExecutionException which can be thrown when + * adding public resources to the pending queue. The exception can be thrown + * either due to the incoming queue being full or if the + * ExecutorCompletionService threadpool is shutdown. Since it's hard to + * simulate the queue being full, this test just shuts down the threadpool and + * makes sure the exception is handled. If anything is messed up the async + * dispatcher thread will cause a system exit causing the test to fail. + */ + @Test + @SuppressWarnings("unchecked") + public void testPublicResourceRejectedExecutionException() throws Exception { + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + DrainDispatcher dispatcher = new DrainDispatcher(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + dispatcher.init(conf); + dispatcher.start(); + + try { + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext( + isA(Configuration.class)); + + spyService.init(conf); + spyService.start(); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // shutdown the thread pool + PublicLocalizer publicLocalizer = spyService.getPublicLocalizer(); + publicLocalizer.threadPool.shutdown(); + + // init container. + final Container c = getMockContainer(appId, 42); + + // init resources + Random r = new Random(); + r.setSeed(r.nextLong()); + + // Queue localization request for the public resource + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + Map> req = + new HashMap>(); + req + .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq)); + + // send request + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + LocalResourcesTracker tracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + Assert.assertNull(tracker.getLocalizedResource(pubReq)); + + } finally { + // if we call stop with events in the queue, an InterruptedException gets + // thrown resulting in the dispatcher thread causing a system exit + dispatcher.await(); + dispatcher.stop(); + } + } @Test(timeout = 100000) @SuppressWarnings("unchecked") @@ -829,6 +924,8 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception { } } } + + @Test(timeout = 10000) @SuppressWarnings("unchecked")