diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index c8048e9..c917c3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager; -import com.google.common.base.Optional; - import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -38,14 +36,15 @@ import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.CommandExecutor; +import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -64,6 +63,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; public class DefaultContainerExecutor extends ContainerExecutor { @@ -134,13 +134,25 @@ public void startLocalizer(LocalizerStartContext ctx) localizerFc.setWorkingDirectory(appStorageDir); LOG.info("Localizer CWD set to " + appStorageDir + " = " + localizerFc.getWorkingDirectory()); + ContainerLocalizer localizer = - new ContainerLocalizer(localizerFc, user, appId, locId, - getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf())); + createContainerLocalizer(user, appId, locId, localDirs, localizerFc); // TODO: DO it over RPC for maintaining similarity? localizer.runLocalization(nmAddr); } + @Private + @VisibleForTesting + protected ContainerLocalizer createContainerLocalizer(String user, String appId, + String locId, List localDirs, FileContext localizerFc) + throws IOException { + ContainerLocalizer localizer = + new ContainerLocalizer(localizerFc, user, appId, locId, + getPaths(localDirs), + RecordFactoryProvider.getRecordFactory(getConf())); + return localizer; + } + @Override public int launchContainer(ContainerStartContext ctx) throws IOException { Container container = ctx.getContainer(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 927699e..57cc346 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainerLocalizer { @@ -117,14 +119,16 @@ public ContainerLocalizer(FileContext lfs, String user, String appId, this.pendingResources = new HashMap>(); } - LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { + @Private + @VisibleForTesting + public LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { YarnRPC rpc = YarnRPC.create(conf); return (LocalizationProtocol) rpc.getProxy(LocalizationProtocol.class, nmAddr, conf); } @SuppressWarnings("deprecation") - public int runLocalization(final InetSocketAddress nmAddr) + public void runLocalization(final InetSocketAddress nmAddr) throws IOException, InterruptedException { // load credentials initDirs(conf, user, appId, lfs, localDirs); @@ -168,12 +172,9 @@ public LocalizationProtocol run() { exec = createDownloadThreadPool(); CompletionService ecs = createCompletionService(exec); localizeFiles(nodeManager, ecs, ugi); - return 0; + return; } catch (Throwable e) { - // Print traces to stdout so that they can be logged by the NM address - // space. - e.printStackTrace(System.out); - return -1; + throw new IOException(e); } finally { try { if (exec != null) { @@ -229,7 +230,7 @@ protected void closeFileSystems(UserGroupInformation ugi) { protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService cs, UserGroupInformation ugi) - throws IOException { + throws IOException, YarnException { while (true) { try { LocalizerStatus status = createStatus(); @@ -251,10 +252,15 @@ protected void localizeFiles(LocalizationProtocol nodemanager, pending.cancel(true); } status = createStatus(); - // ignore response + // ignore response while dying. try { nodemanager.heartbeat(status); - } catch (YarnException e) { } + } catch (YarnException e) { + // Cannot do anything about this during death stage, let's just log + // it. + e.printStackTrace(System.out); + LOG.error("Heartbeat failed while dying: ", e); + } return; } cs.poll(1000, TimeUnit.MILLISECONDS); @@ -262,7 +268,7 @@ protected void localizeFiles(LocalizationProtocol nodemanager, return; } catch (YarnException e) { // TODO cleanup - return; + throw e; } } } @@ -380,16 +386,14 @@ public static void main(String[] argv) throws Throwable { new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null)); - int nRet = localizer.runLocalization(nmAddr); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("nRet: %d", nRet)); - } - System.exit(nRet); + localizer.runLocalization(nmAddr); + return; } catch (Throwable e) { - // Print error to stdout so that LCE can use it. + // Print traces to stdout so that they can be logged by the NM address + // space in both DefaultCE and LCE cases e.printStackTrace(System.out); LOG.error("Exception in main:", e); - throw e; + System.exit(-1); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 5696ae8..37cbf93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -20,65 +20,60 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doAnswer; -import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileReader; import java.io.FileWriter; -import java.io.InputStream; import java.io.IOException; -import java.io.LineNumberReader; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Random; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; - +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -345,9 +340,8 @@ public Object answer(InvocationOnMock invocationOnMock) } @Test(timeout = 30000) - public void testStartLocalizer() - throws IOException, InterruptedException { - InetSocketAddress localizationServerAddress; + public void testStartLocalizer() throws IOException, InterruptedException, + YarnException { final Path firstDir = new Path(BASE_TMP_PATH, "localDir1"); List localDirs = new ArrayList(); @@ -358,11 +352,6 @@ public void testStartLocalizer() FsPermission perms = new FsPermission((short)0770); Configuration conf = new Configuration(); - localizationServerAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_LOCALIZER_ADDRESS, - YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, - YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf)); final FileContext.Util mockUtil = spy(mockLfs.util()); @@ -400,6 +389,7 @@ public Object answer(InvocationOnMock invocationOnMock) return null; } }).when(mockUtil).copy(any(Path.class), any(Path.class)); + doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) @@ -415,8 +405,33 @@ public Object answer(InvocationOnMock invocationOnMock) } }).when(mockLfs).getFsStatus(any(Path.class)); - DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor( - mockLfs)); + DefaultContainerExecutor mockExec = + spy(new DefaultContainerExecutor(mockLfs) { + @Override + public ContainerLocalizer createContainerLocalizer(String user, + String appId, String locId, List localDirs, + FileContext localizerFc) throws IOException { + + // Spy on the localizer and make it return valid heart-beat + // responses even though there is no real NodeManager. + ContainerLocalizer localizer = + super.createContainerLocalizer(user, appId, locId, localDirs, + localizerFc); + ContainerLocalizer spyLocalizer = spy(localizer); + LocalizationProtocol nmProxy = mock(LocalizationProtocol.class); + try { + when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenReturn( + new MockLocalizerHeartbeatResponse(LocalizerAction.DIE, + new ArrayList())); + } catch (YarnException e) { + throw new IOException(e); + } + when(spyLocalizer.getProxy(any(InetSocketAddress.class))) + .thenReturn(nmProxy); + + return spyLocalizer; + } + }); mockExec.setConf(conf); localDirs.add(mockLfs.makeQualified(firstDir).toString()); localDirs.add(mockLfs.makeQualified(secondDir).toString()); @@ -433,18 +448,20 @@ public Object answer(InvocationOnMock invocationOnMock) LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); when(dirsHandler.getLocalDirs()).thenReturn(localDirs); when(dirsHandler.getLogDirs()).thenReturn(logDirs); - + try { mockExec.startLocalizer(new LocalizerStartContext.Builder() .setNmPrivateContainerTokens(nmPrivateCTokensPath) - .setNmAddr(localizationServerAddress) + .setNmAddr(null) .setUser(appSubmitter) .setAppId(appId) .setLocId(locId) .setDirsHandler(dirsHandler) .build()); + } catch (IOException e) { - Assert.fail("StartLocalizer failed to copy token file " + e); + Assert.fail("StartLocalizer failed to copy token file: " + + StringUtils.stringifyException(e)); } finally { mockExec.deleteAsUser(new DeletionAsUserContext.Builder() .setUser(appSubmitter) @@ -460,7 +477,12 @@ public Object answer(InvocationOnMock invocationOnMock) .build()); deleteTmpFiles(); } + + // Verify that the calls happen the expected number of times + verify(mockUtil, times(1)).copy(any(Path.class), any(Path.class)); + verify(mockLfs, times(2)).getFsStatus(any(Path.class)); } + // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java index 1fcf5bf..1f444c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; @@ -30,11 +29,7 @@ LocalizerAction action; List resourceSpecs; - MockLocalizerHeartbeatResponse() { - resourceSpecs = new ArrayList(); - } - - MockLocalizerHeartbeatResponse( + public MockLocalizerHeartbeatResponse( LocalizerAction action, List resources) { this.action = action; this.resourceSpecs = resources; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index 22fad6f..3d6a2a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; @@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -98,7 +100,7 @@ private LocalizationProtocol nmProxy; @Test - public void testContainerLocalizerMain() throws Exception { + public void testMain() throws Exception { FileContext fs = FileContext.getLocalFSFileContext(); spylfs = spy(fs.getDefaultFileSystem()); ContainerLocalizer localizer = @@ -167,7 +169,7 @@ public void testContainerLocalizerMain() throws Exception { isA(UserGroupInformation.class)); // run localization - assertEquals(0, localizer.runLocalization(nmAddr)); + localizer.runLocalization(nmAddr); for (Path p : localDirs) { Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); Path privcache = new Path(base, ContainerLocalizer.FILECACHE); @@ -198,7 +200,27 @@ public boolean matches(Object o) { } })); } - + + @Test(timeout = 15000) + public void testMainFailure() throws Exception { + + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = setupContainerLocalizerForTest(); + + // Assume the NM heartbeat fails say because of absent tokens. + when(nmProxy.heartbeat(isA(LocalizerStatus.class))).thenThrow( + new YarnException("Sigh, no token!")); + + // run localization, it should fail + try { + localizer.runLocalization(nmAddr); + Assert.fail("Localization succeeded unexpectedly!"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Sigh, no token!")); + } + } + @Test @SuppressWarnings("unchecked") public void testLocalizerTokenIsGettingRemoved() throws Exception { @@ -214,18 +236,22 @@ public void testLocalizerTokenIsGettingRemoved() throws Exception { @Test @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { + // verify filesystems are closed when localizer doesn't fail FileContext fs = FileContext.getLocalFSFileContext(); spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = setupContainerLocalizerForTest(); doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); + localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); spylfs = spy(fs.getDefaultFileSystem()); + // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( @@ -233,8 +259,12 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); - localizer.runLocalization(nmAddr); - verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + try { + localizer.runLocalization(nmAddr); + Assert.fail("Localization succeeded unexpectedly!"); + } catch (IOException e) { + verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + } } @SuppressWarnings("unchecked") // mocked generics