diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 2b7cd5f..b82c8a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -466,4 +466,11 @@ public ReservationDeleteResponse deleteReservation( throws YarnException, IOException { return client.getClusterNodeLabels(); } + + @Override + public YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException { + return client.waitForApplicationState(appId, verbose, terminateStates); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 5ce626c..632d37d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -664,6 +664,27 @@ public abstract ReservationDeleteResponse deleteReservation( */ @Public @Unstable - public abstract List getClusterNodeLabels() + public abstract List getClusterNodeLabels() throws YarnException, + IOException; + + /** + *

+ * Wait for application until it's state becomes one of + * {terminateStates} OR stopped state {completed, failed, killed} + *

+ * + * @param appId + * Application to wait + * @param verbose + * Print verbose message when wait + * @param terminateStates + * States this method waiting for, pass null in if you only want to + * wait for stopped state - {completed, failed, killed} + * @return The latest state + * @throws YarnException + * @throws IOException + */ + public abstract YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 42dd5cd..7d5e0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -134,6 +134,9 @@ protected boolean timelineServiceBestEffort; private static final String ROOT = "root"; + private static final Set STOPPED_APPLICATION_STATES = EnumSet + .of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED); public YarnClientImpl() { super(YarnClientImpl.class.getName()); @@ -820,4 +823,59 @@ public ReservationDeleteResponse deleteReservation( return rmClient.getClusterNodeLabels( GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); } + + public YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException { + YarnApplicationState lastState = null; + while (true) { + // Get application report for our specified appId + ApplicationReport report = getApplicationReport(appId); + YarnApplicationState state = report.getYarnApplicationState(); + + // print verbose message when state changed + if (verbose) { + if (lastState == null) { + LOG.info(String.format("Application %s's state now is [%s]", + appId.toString(), state.name())); + lastState = state; + } else if (lastState != state) { + LOG.info(String.format( + "Application %s's state transfered from [%s] to [%s]", + appId.toString(), lastState.name(), state.name())); + lastState = state; + } + } + + if (STOPPED_APPLICATION_STATES.contains(state) + || (null != terminateStates && terminateStates.contains(state))) { + + // print verbose message if it's stopped state + if (verbose && STOPPED_APPLICATION_STATES.contains(state)) { + // if it's not succeed, print diagnostic message + if (YarnApplicationState.FINISHED != state) { + String diagMsg = report.getDiagnostics(); + if (null != diagMsg) { + LOG.info("Diagnostic message:"); + LOG.info(diagMsg); + } + } + + LOG.info(String.format( + "Application %s's total running time is %f seconds", + appId.toString(), + ((report.getFinishTime() - report.getStartTime()) / 1000.0))); + } + + return state; + } + + // Check app status after asyncApiPollIntervalMillis + try { + Thread.sleep(asyncApiPollIntervalMillis); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 10b9bbb..9792b38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -955,7 +955,49 @@ public void testAsyncAPIPollTimeout() { testAsyncAPIPollTimeoutHelper(0L, true); testAsyncAPIPollTimeoutHelper(1L, true); } + + @Test(timeout = 10000) + public void testWaitForApplicationStates() throws YarnException, IOException { + testWaitForApplicationStateInternal(null, true); + + Set states = + EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.SUBMITTED); + testWaitForApplicationStateInternal(states, false); + } + + private void + testWaitForApplicationStateInternal(Set states, boolean verbose) + throws YarnException, IOException { + Configuration conf = new Configuration(); + + // set a shorter pull interval to make this test finished earlier + final int yarnAppStateChangeInterval = 50; + conf.setLong( + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + yarnAppStateChangeInterval); + MockYarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + YarnApplicationState returnedState = null; + + if (null == states) { + // if user not set, simply use Finished + client.setYarnApplicationState(YarnApplicationState.FINISHED); + returnedState = YarnApplicationState.FINISHED; + } else { + // otherwise, use the first state passed in + returnedState = states.iterator().next(); + client.setYarnApplicationState(returnedState); + } + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + YarnApplicationState state = + client.waitForApplicationState(applicationId, verbose, states); + + Assert.assertEquals(returnedState, state); + } + private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, boolean expectedTimeoutEnforcement) { YarnClientImpl client = new YarnClientImpl(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 7cf6b15..de602a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -326,8 +326,7 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { + " from " + getUser()); return true; } - if (rsrc.getRefCount() > 0 - || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) { + if (rsrc.getRefCount() > 0 || rsrc != rem) { // internal error LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java index 447a792..3e3aac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java @@ -58,6 +58,10 @@ public void addResources(LocalResourcesTracker newTracker) { // always retain resources in use continue; } + if(ResourceState.DOWNLOADING == resource.getState()) { + newTracker.remove(resource, delService); + continue; + } retain.put(resource, newTracker); } for (Iterator> i = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 2edaf45..dcab91e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1681,6 +1681,131 @@ public void testLocalResourcePath() throws Exception { } } } + + + @Test + @SuppressWarnings("unchecked") // mocked generics + public void testCacheCleanup() throws Exception { + final String user = "user"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, 1); + + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + EventHandler localizerBus = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerBus); + + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + ResourceLocalizationService spyService = + createSpyService(dispatcher, dirsHandler, stateStore); + try { + spyService.init(conf); + spyService.start(); + + final Application app = mock(Application.class); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // Setup a private and public resource tracker + LocalResourcesTracker privTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user, null); + LocalResourcesTracker pubTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + null, null); + + final Container c = getMockContainer(appId, 1, user); + + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // Send localization requests of each type. + final LocalResource privResource1 = getPrivateMockedResource(r); + final LocalResourceRequest privReq1 = + new LocalResourceRequest(privResource1); + + final LocalResource privResource2 = getPrivateMockedResource(r); + final LocalResourceRequest privReq2 = + new LocalResourceRequest(privResource2); + + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = + new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PRIVATE, + Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 })); + req.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq)); + + // Send Container Localization Request + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + + // Start Localization + privTracker.getPathForLocalization(privReq1, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.USERCACHE + user)); + LocalizedResource privLr1 = privTracker.getLocalizedResource(privReq1); + privTracker.getPathForLocalization(privReq2, + dirsHandler.getLocalPathForWrite( + ContainerLocalizer.USERCACHE + user)); + LocalizedResource privLr2 = privTracker.getLocalizedResource(privReq2); + pubTracker.getPathForLocalization(pubReq, + dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); + LocalizedResource pubLr = pubTracker.getLocalizedResource(pubReq); + + // Complete Localization for one of the resources + assertNotNull("Localization not started", privLr2.getLocalPath()); + privTracker.handle(new ResourceLocalizedEvent(privReq2, + privLr2.getLocalPath(), privLr2.getSize() + 1048599)); + + + assertEquals(ResourceState.DOWNLOADING, privLr1.getState()); + assertEquals(ResourceState.LOCALIZED, privLr2.getState()); + assertEquals(ResourceState.DOWNLOADING, pubLr.getState()); + + // Call for Container Localization Cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c, req)); + dispatcher.await(); + + // Inovke Cache Cleanup directly + spyService.handle(new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP)); + dispatcher.await(); + assertEquals(null, privTracker.getLocalizedResource(privReq1)); + assertEquals(null, privTracker.getLocalizedResource(privReq2)); + assertEquals(null, pubTracker.getLocalizedResource(pubReq)); + } finally { + dispatcher.stop(); + stateStore.close(); + } + } private LocalizerStatus createLocalizerStatusForFailedResource( String localizerId, LocalResourceRequest req) {