diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 66f21f6..51ce9e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; -import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; @@ -130,9 +129,12 @@ public int runLocalization(final InetSocketAddress nmAddr) try { // assume credentials in cwd // TODO: Fix - credFile = lfs.open( - new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId))); + Path tokenPath = + new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId)); + credFile = lfs.open(tokenPath); creds.readTokenStorageStream(credFile); + // Explicitly deleting token file. + lfs.delete(tokenPath, false); } finally { if (credFile != null) { credFile.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 70debe0..06587df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -272,7 +273,11 @@ Server createServer() { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); if (UserGroupInformation.isSecurityEnabled()) { - secretManager = new LocalizerTokenSecretManager(); + secretManager = new LocalizerTokenSecretManager(); + conf = new Configuration(conf); + // always enforce it to be token-based. + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); } Server server = rpc.getServer(LocalizationProtocol.class, this, @@ -1017,6 +1022,7 @@ private void writeCredentials(Path nmPrivateCTokensPath) } } if (UserGroupInformation.isSecurityEnabled()) { + credentials = new Credentials(credentials); LocalizerTokenIdentifier id = secretManager.createIdentifier(); Token localizerToken = new Token(id, secretManager); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index 2d80bb9..983c102 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -34,11 +36,13 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; @@ -46,11 +50,17 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -58,6 +68,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -69,14 +80,17 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestContainerLocalizer { + static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class); static final Path basedir = new Path("target", TestContainerLocalizer.class.getName()); @@ -94,7 +108,10 @@ @Test public void testContainerLocalizerMain() throws Exception { - ContainerLocalizer localizer = setupContainerLocalizerForTest(); + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); + ContainerLocalizer localizer = + setupContainerLocalizerForTest(); // verify created cache List privCacheList = new ArrayList(); @@ -195,14 +212,18 @@ public boolean matches(Object o) { @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { // verify filesystems are closed when localizer doesn't fail + FileContext fs = FileContext.getLocalFSFileContext(); + spylfs = spy(fs.getDefaultFileSystem()); ContainerLocalizer localizer = setupContainerLocalizerForTest(); doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); localizer.runLocalization(nmAddr); + verify(spylfs, times(1)).delete(tokenPath, false); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); + spylfs = spy(fs.getDefaultFileSystem()); // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( @@ -217,7 +238,6 @@ public void testContainerLocalizerClosesFilesystems() throws Exception { @SuppressWarnings("unchecked") // mocked generics private ContainerLocalizer setupContainerLocalizerForTest() throws Exception { - spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); // don't actually create dirs doNothing().when(spylfs).mkdir( isA(Path.class), isA(FsPermission.class), anyBoolean()); @@ -245,10 +265,10 @@ private ContainerLocalizer setupContainerLocalizerForTest() containerId))); doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens)) ).when(spylfs).open(tokenPath); - nmProxy = mock(LocalizationProtocol.class); doReturn(nmProxy).when(localizer).getProxy(nmAddr); doNothing().when(localizer).sleep(anyInt()); + // return result instantly for deterministic test ExecutorService syncExec = mock(ExecutorService.class);