diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 927699e..d2d1166 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -124,7 +124,7 @@ LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { } @SuppressWarnings("deprecation") - public int runLocalization(final InetSocketAddress nmAddr) + public void runLocalization(final InetSocketAddress nmAddr) throws IOException, InterruptedException { // load credentials initDirs(conf, user, appId, lfs, localDirs); @@ -168,12 +168,9 @@ public LocalizationProtocol run() { exec = createDownloadThreadPool(); CompletionService ecs = createCompletionService(exec); localizeFiles(nodeManager, ecs, ugi); - return 0; + return; } catch (Throwable e) { - // Print traces to stdout so that they can be logged by the NM address - // space. - e.printStackTrace(System.out); - return -1; + throw new IOException(e); } finally { try { if (exec != null) { @@ -229,7 +226,7 @@ protected void closeFileSystems(UserGroupInformation ugi) { protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService cs, UserGroupInformation ugi) - throws IOException { + throws IOException, YarnException { while (true) { try { LocalizerStatus status = createStatus(); @@ -251,10 +248,14 @@ protected void localizeFiles(LocalizationProtocol nodemanager, pending.cancel(true); } status = createStatus(); - // ignore response + // ignore response while dying. try { nodemanager.heartbeat(status); - } catch (YarnException e) { } + } catch (YarnException e) { + // Cannot do anything about this during death stage, let's just log + // it. + e.printStackTrace(); + } return; } cs.poll(1000, TimeUnit.MILLISECONDS); @@ -262,7 +263,7 @@ protected void localizeFiles(LocalizationProtocol nodemanager, return; } catch (YarnException e) { // TODO cleanup - return; + throw e; } } } @@ -380,16 +381,14 @@ public static void main(String[] argv) throws Throwable { new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null)); - int nRet = localizer.runLocalization(nmAddr); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("nRet: %d", nRet)); - } - System.exit(nRet); + localizer.runLocalization(nmAddr); + System.exit(0); } catch (Throwable e) { - // Print error to stdout so that LCE can use it. + // Print traces to stdout so that they can be logged by the NM address + // space in both DefaultCE and LCE cases e.printStackTrace(System.out); LOG.error("Exception in main:", e); - throw e; + System.exit(-1); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index 22fad6f..94ccba3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; @@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -167,7 +169,7 @@ public void testContainerLocalizerMain() throws Exception { isA(UserGroupInformation.class)); // run localization - assertEquals(0, localizer.runLocalization(nmAddr)); + localizer.runLocalization(nmAddr); for (Path p : localDirs) { Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); Path privcache = new Path(base, ContainerLocalizer.FILECACHE); @@ -211,9 +213,30 @@ public void testLocalizerTokenIsGettingRemoved() throws Exception { verify(spylfs, times(1)).delete(tokenPath, false); } + @Test(timeout = 15000) + public void testContainerLocalizerMainFailure() throws Exception { + + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = setupContainerLocalizerForTest(); + + // Assume the NM heartbeat fails say because of absent tokens. + when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow( + new YarnException("Sigh, no token!")); + + // run localization, it should fail + try { + localizer.runLocalization(nmAddr); + Assert.fail("Localization succeeded unexpectedly!"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Sigh, no token!")); + } + } + @Test @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { + // verify filesystems are closed when localizer doesn't fail FileContext fs = FileContext.getLocalFSFileContext(); spylfs = spy(fs.getDefaultFileSystem()); @@ -226,6 +249,7 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { verify(localizer).closeFileSystems(any(UserGroupInformation.class)); spylfs = spy(fs.getDefaultFileSystem()); + // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( @@ -233,8 +257,12 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); - localizer.runLocalization(nmAddr); - verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + try { + localizer.runLocalization(nmAddr); + Assert.fail("Localization succeeded unexpectedly!"); + } catch (IOException e) { + verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + } } @SuppressWarnings("unchecked") // mocked generics