diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 66f21f6..51ce9e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; -import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; @@ -130,9 +129,12 @@ public int runLocalization(final InetSocketAddress nmAddr) try { // assume credentials in cwd // TODO: Fix - credFile = lfs.open( - new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId))); + Path tokenPath = + new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId)); + credFile = lfs.open(tokenPath); creds.readTokenStorageStream(credFile); + // Explicitly deleting token file. + lfs.delete(tokenPath, false); } finally { if (credFile != null) { credFile.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 70debe0..4cbc37d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -1017,6 +1017,7 @@ private void writeCredentials(Path nmPrivateCTokensPath) } } if (UserGroupInformation.isSecurityEnabled()) { + credentials = new Credentials(credentials); LocalizerTokenIdentifier id = secretManager.createIdentifier(); Token localizerToken = new Token(id, secretManager); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index 2d80bb9..10da1be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +47,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataInputStream; @@ -77,6 +80,7 @@ public class TestContainerLocalizer { + static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class); static final Path basedir = new Path("target", TestContainerLocalizer.class.getName()); @@ -94,7 +98,10 @@ @Test public void testContainerLocalizerMain() throws Exception { - ContainerLocalizer localizer = setupContainerLocalizerForTest(); + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = + setupContainerLocalizerForTest(); // verify created cache List privCacheList = new ArrayList(); @@ -190,11 +197,25 @@ public boolean matches(Object o) { } })); } + + @Test + @SuppressWarnings("unchecked") + public void testLocalizerTokenIsGettingRemoved() throws Exception { + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = setupContainerLocalizerForTest(); + doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), + any(CompletionService.class), any(UserGroupInformation.class)); + localizer.runLocalization(nmAddr); + verify(spylfs, times(1)).delete(tokenPath, false); + } @Test @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { // verify filesystems are closed when localizer doesn't fail + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); ContainerLocalizer localizer = setupContainerLocalizerForTest(); doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); @@ -203,6 +224,7 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + spylfs = spy(fs.getDefaultFileSystem()); // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( @@ -217,7 +239,6 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { @SuppressWarnings("unchecked") // mocked generics private ContainerLocalizer setupContainerLocalizerForTest() throws Exception { - spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); // don't actually create dirs doNothing().when(spylfs).mkdir( isA(Path.class), isA(FsPermission.class), anyBoolean()); @@ -245,10 +266,10 @@ private ContainerLocalizer setupContainerLocalizerForTest() containerId))); doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens)) ).when(spylfs).open(tokenPath); - nmProxy = mock(LocalizationProtocol.class); doReturn(nmProxy).when(localizer).getProxy(nmAddr); doNothing().when(localizer).sleep(anyInt()); + // return result instantly for deterministic test ExecutorService syncExec = mock(ExecutorService.class);