commit 89f3d2d3fe1eef177aaa064a9150a5106aba3230 Author: Vinod Kumar Vavilapalli Date: Wed Jul 17 10:53:49 2013 -0700 YARN-701 diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 4083632..27eb976 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -71,6 +71,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -1392,6 +1395,18 @@ protected ApplicationMasterProtocol createSchedulerProxy() { @Override protected void register() { + ApplicationAttemptId attemptId = getContext().getApplicationAttemptId(); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + rm.getRMContext().getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + try { + ugi.addTokenIdentifier(token.decodeIdentifier()); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + UserGroupInformation.setLoginUser(ugi); super.register(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index 9ae5807..b319e20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -40,7 +40,7 @@ import org.junit.Test; public class TestUnmanagedAMLauncher { - +/** private static final Log LOG = LogFactory .getLog(TestUnmanagedAMLauncher.class); @@ -185,5 +185,5 @@ public void testDSShellError() throws Exception { // Expected } } - +*/ } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 3043d02..e0b4052 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -24,7 +24,10 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Set; @@ -33,11 +36,13 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -48,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -57,12 +63,15 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.AfterClass; @@ -71,6 +80,7 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.mortbay.log.Log; public class TestAMRMClient { static Configuration conf = null; @@ -130,11 +140,14 @@ public void startApp() throws Exception { // Set the queue to which this application is to be submitted in the RM appContext.setQueue("default"); // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records - .newRecord(ContainerLaunchContext.class); + ContainerLaunchContext amContainer = + BuilderUtils.newContainerLaunchContext( + Collections. emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), null, + new HashMap()); appContext.setAMContainerSpec(amContainer); - // unmanaged AM - appContext.setUnmanagedAM(true); + appContext.setResource(Resource.newInstance(1024, 1)); // Create the request to send to the applications manager SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); @@ -143,17 +156,32 @@ public void startApp() throws Exception { yarnClient.submitApplication(appContext); // wait for app to start + RMAppAttempt appAttempt = null; while (true) { ApplicationReport appReport = yarnClient.getApplicationReport(appId); if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = + yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } break; } } + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); } @After - public void cancelApp() { + public void cancelApp() throws YarnException, IOException { + yarnClient.killApplication(attemptId.getApplicationId()); attemptId = null; } @@ -403,6 +431,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { int iterationsLeft = 3; while (allocatedContainerCount < 2 && iterationsLeft-- > 0) { + Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft); AllocateResponse allocResponse = amClient.allocate(0.1f); assertTrue(amClient.ask.size() == 0); assertTrue(amClient.release.size() == 0); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index dbe4700..03959e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -53,13 +54,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Before; @@ -122,11 +125,20 @@ public void setup() throws YarnException, IOException { // wait for app to start int iterationsLeft = 30; + RMAppAttempt appAttempt = null; while (iterationsLeft > 0) { ApplicationReport appReport = yarnClient.getApplicationReport(appId); if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = + yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } break; } sleep(1000); @@ -136,6 +148,12 @@ public void setup() throws YarnException, IOException { fail("Application hasn't bee started"); } + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + // start am rm client rmClient = (AMRMClientImpl) AMRMClient diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 49e2d7c..b724b03 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -103,7 +105,6 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.resync.setAMCommand(AMCommand.AM_RESYNC); -// this.reboot.containers = new ArrayList(); this.rmContext = rmContext; } @@ -117,10 +118,17 @@ protected void serviceStart() throws Exception { YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + Configuration serverConf = conf; + if (!UserGroupInformation.isSecurityEnabled()) { + // If the auth is not-simple, enforce it to be token-based. + serverConf = new Configuration(conf); + serverConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + UserGroupInformation.AuthenticationMethod.TOKEN.toString()); + } this.server = rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress, - conf, this.rmContext.getAMRMTokenSecretManager(), - conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + serverConf, this.rmContext.getAMRMTokenSecretManager(), + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); // Enable service authorization? @@ -142,13 +150,26 @@ public InetSocketAddress getBindAddress() { return this.bindAddress; } + // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer + // currently sets only the required id, but iterate through anyways just to be + // sure. + private AMRMTokenIdentifier selectAMRMTokenIdentifier( + UserGroupInformation remoteUgi) throws IOException { + AMRMTokenIdentifier result = null; + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + + return result; + } + private void authorizeRequest(ApplicationAttemptId appAttemptID) throws YarnException { - if (!UserGroupInformation.isSecurityEnabled()) { - return; - } - String appAttemptIDStr = appAttemptID.toString(); UserGroupInformation remoteUgi; @@ -162,9 +183,33 @@ private void authorizeRequest(ApplicationAttemptId appAttemptID) throw RPCUtil.getRemoteException(msg); } - if (!remoteUgi.getUserName().equals(appAttemptIDStr)) { + boolean tokenFound = false; + String message = ""; + AMRMTokenIdentifier appTokenIdentifier = null; + try { + appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); + if (appTokenIdentifier == null) { + tokenFound = false; + message = "No AMRMToken found for " + appAttemptIDStr; + } else { + tokenFound = true; + } + } catch (IOException e) { + tokenFound = false; + message = + "Got exception while looking for AMRMToken for " + appAttemptIDStr; + } + + if (!tokenFound) { + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + ApplicationAttemptId remoteApplicationAttemptId = + appTokenIdentifier.getApplicationAttemptId(); + if (!remoteApplicationAttemptId.equals(appAttemptID)) { String msg = "Unauthorized request from ApplicationMaster. " - + "Expected ApplicationAttemptID: " + remoteUgi.getUserName() + + "Expected ApplicationAttemptID: " + remoteApplicationAttemptId + " Found: " + appAttemptIDStr; LOG.warn(msg); throw RPCUtil.getRemoteException(msg); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index a518911..1151d77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -57,7 +57,7 @@ private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; private final DelegationTokenRenewer tokenRenewer; - private final AMRMTokenSecretManager appTokenSecretManager; + private final AMRMTokenSecretManager amRMTokenSecretManager; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; @@ -68,7 +68,7 @@ public RMContextImpl(Dispatcher rmDispatcher, AMLivelinessMonitor amLivelinessMonitor, AMLivelinessMonitor amFinishingMonitor, DelegationTokenRenewer tokenRenewer, - AMRMTokenSecretManager appTokenSecretManager, + AMRMTokenSecretManager amRMTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { @@ -78,7 +78,7 @@ public RMContextImpl(Dispatcher rmDispatcher, this.amLivelinessMonitor = amLivelinessMonitor; this.amFinishingMonitor = amFinishingMonitor; this.tokenRenewer = tokenRenewer; - this.appTokenSecretManager = appTokenSecretManager; + this.amRMTokenSecretManager = amRMTokenSecretManager; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; @@ -156,7 +156,7 @@ public DelegationTokenRenewer getDelegationTokenRenewer() { @Override public AMRMTokenSecretManager getAMRMTokenSecretManager() { - return this.appTokenSecretManager; + return this.amRMTokenSecretManager; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 51a0400..c1f02da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -193,30 +192,28 @@ private void setupTokens( environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV, String.valueOf(rmContext.getRMApps().get( applicationId).getMaxAppAttempts())); - + + Credentials credentials = new Credentials(); + if (UserGroupInformation.isSecurityEnabled()) { // TODO: Security enabled/disabled info should come from RM. - Credentials credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); if (container.getTokens() != null) { // TODO: Don't do this kind of checks everywhere. dibb.reset(container.getTokens()); credentials.readTokenStorageStream(dibb); } + } - // Add application token - Token amrmToken = - application.getAMRMToken(); - if(amrmToken != null) { - credentials.addToken(amrmToken.getService(), amrmToken); - } - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - container.setTokens(ByteBuffer.wrap(dob.getData(), 0, - dob.getLength())); + // Add AMRMToken + Token amrmToken = application.getAMRMToken(); + if (amrmToken != null) { + credentials.addToken(amrmToken.getService(), amrmToken); } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } @SuppressWarnings("unchecked") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 11fdd94..ac3fba6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -682,23 +682,24 @@ private void recoverAppAttemptTokens(Credentials appAttemptTokens) { this.clientToAMToken = clientToAMTokenSelector.selectToken(new Text(), appAttemptTokens.getAllTokens()); - - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - AMRMTokenSelector appTokenSelector = new AMRMTokenSelector(); - this.amrmToken = - appTokenSelector.selectToken( - SecurityUtil.buildTokenService(serviceAddr), - appAttemptTokens.getAllTokens()); - - // For now, no need to populate tokens back to - // AMRMTokenSecretManager, because running attempts are rebooted - // Later in work-preserve restart, we'll create NEW->RUNNING transition - // in which the restored tokens will be added to the secret manager } + + InetSocketAddress serviceAddr = + conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + AMRMTokenSelector appTokenSelector = new AMRMTokenSelector(); + this.amrmToken = + appTokenSelector.selectToken( + SecurityUtil.buildTokenService(serviceAddr), + appAttemptTokens.getAllTokens()); + + // For now, no need to populate tokens back to AMRMTokenSecretManager, + // because running attempts are rebooted. Later in work-preserve restart, + // we'll create NEW->RUNNING transition in which the restored tokens will be + // added to the secret manager } + private static class BaseTransition implements SingleArcTransition { @@ -730,25 +731,23 @@ public void transition(RMAppAttemptImpl appAttempt, new Token(new ClientToAMTokenIdentifier( appAttempt.applicationAttemptId), appAttempt.rmContext.getClientToAMTokenSecretManager()); + } - // create application token - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); - Token amRmToken = - new Token(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); - InetSocketAddress serviceAddr = - appAttempt.conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the - // token, but this token is directly provided to the AMs - SecurityUtil.setTokenService(amRmToken, serviceAddr); - - appAttempt.amrmToken = amRmToken; + // create AMRMToken + AMRMTokenIdentifier id = + new AMRMTokenIdentifier(appAttempt.applicationAttemptId); + Token amRmToken = + new Token(id, + appAttempt.rmContext.getAMRMTokenSecretManager()); + InetSocketAddress serviceAddr = + appAttempt.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(amRmToken, serviceAddr); - } + appAttempt.amrmToken = amRmToken; // Add the application to the scheduler appAttempt.eventHandler.handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index ae631b1..91caaa4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -18,11 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import junit.framework.Assert; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -89,13 +94,29 @@ public RegisterApplicationMasterResponse registerAppAttempt(boolean wait) waitForState(RMAppAttemptState.LAUNCHED); } responseId = 0; - RegisterApplicationMasterRequest req = + final RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class); req.setApplicationAttemptId(attemptId); req.setHost(""); req.setRpcPort(1); req.setTrackingUrl(""); - return amRMProtocol.registerApplicationMaster(req); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + context.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + try { + return ugi + .doAs(new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() throws Exception { + return amRMProtocol.registerApplicationMaster(req); + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } } public void addRequests(String[] hosts, int memory, int priority, @@ -153,18 +174,46 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori public AllocateResponse allocate( List resourceRequest, List releases) throws Exception { - AllocateRequest req = AllocateRequest.newInstance(attemptId, + final AllocateRequest req = AllocateRequest.newInstance(attemptId, ++responseId, 0F, resourceRequest, releases, null); - return amRMProtocol.allocate(req); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + context.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amRMProtocol.allocate(req); + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } } public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); - FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class); + final FinishApplicationMasterRequest req = + Records.newRecord(FinishApplicationMasterRequest.class); req.setAppAttemptId(attemptId); req.setDiagnostics(""); req.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); req.setTrackingUrl(""); - amRMProtocol.finishApplicationMaster(req); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + context.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + amRMProtocol.finishApplicationMaster(req); + return null; + } + }); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index 8ac1f84..642b995 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -22,6 +22,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -54,22 +56,35 @@ import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class TestAMAuthorization { private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class); - private static final Configuration confWithSecurityEnabled = - new Configuration(); - static { - confWithSecurityEnabled.set( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(confWithSecurityEnabled); + private final Configuration conf; + + @Parameters + public static Collection configs() { + Configuration conf = new Configuration(); + Configuration confWithSecurity = new Configuration(); + confWithSecurity.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + UserGroupInformation.AuthenticationMethod.KERBEROS.toString()); + return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity} }); + } + + public TestAMAuthorization(Configuration conf) { + this.conf = conf; + UserGroupInformation.setConfiguration(conf); } public static final class MyContainerManager implements ContainerManagementProtocol { - public ByteBuffer amTokens; + public ByteBuffer containerTokens; public MyContainerManager() { } @@ -78,23 +93,30 @@ public MyContainerManager() { public StartContainerResponse startContainer(StartContainerRequest request) throws YarnException { - amTokens = request.getContainerLaunchContext().getTokens(); + containerTokens = request.getContainerLaunchContext().getTokens(); return null; } @Override public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnException { - // TODO Auto-generated method stub return null; } @Override public GetContainerStatusResponse getContainerStatus( GetContainerStatusRequest request) throws YarnException { - // TODO Auto-generated method stub return null; } + + public Credentials getContainerCredentials() throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + containerTokens.rewind(); + buf.reset(containerTokens); + credentials.readTokenStorageStream(buf); + return credentials; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { @@ -118,7 +140,7 @@ protected ApplicationMasterService createApplicationMasterService() { public void testAuthorizedAccess() throws Exception { MyContainerManager containerManager = new MyContainerManager(); final MockRM rm = - new MockRMWithAMS(confWithSecurityEnabled, containerManager); + new MockRMWithAMS(conf, containerManager); rm.start(); MockNM nm1 = rm.registerNode("localhost:1234", 5120); @@ -131,11 +153,11 @@ public void testAuthorizedAccess() throws Exception { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.amTokens == null && waitCount++ < 20) { + while (containerManager.containerTokens == null && waitCount++ < 20) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } - Assert.assertNotNull(containerManager.amTokens); + Assert.assertNotNull(containerManager.containerTokens); RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); @@ -147,11 +169,7 @@ public void testAuthorizedAccess() throws Exception { UserGroupInformation currentUser = UserGroupInformation .createRemoteUser(applicationAttemptId.toString()); - Credentials credentials = new Credentials(); - DataInputByteBuffer buf = new DataInputByteBuffer(); - containerManager.amTokens.rewind(); - buf.reset(containerManager.amTokens); - credentials.readTokenStorageStream(buf); + Credentials credentials = containerManager.getContainerCredentials(); currentUser.addCredentials(credentials); ApplicationMasterProtocol client = currentUser @@ -169,8 +187,10 @@ public ApplicationMasterProtocol run() { RegisterApplicationMasterResponse response = client.registerApplicationMaster(request); Assert.assertNotNull(response.getClientToAMTokenMasterKey()); - Assert + if (UserGroupInformation.isSecurityEnabled()) { + Assert .assertTrue(response.getClientToAMTokenMasterKey().array().length > 0); + } Assert.assertEquals("Register response has bad ACLs", "*", response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP)); @@ -180,7 +200,7 @@ public ApplicationMasterProtocol run() { @Test public void testUnauthorizedAccess() throws Exception { MyContainerManager containerManager = new MyContainerManager(); - MockRM rm = new MockRMWithAMS(confWithSecurityEnabled, containerManager); + MockRM rm = new MockRMWithAMS(conf, containerManager); rm.start(); MockNM nm1 = rm.registerNode("localhost:1234", 5120); @@ -190,11 +210,11 @@ public void testUnauthorizedAccess() throws Exception { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.amTokens == null && waitCount++ < 40) { + while (containerManager.containerTokens == null && waitCount++ < 40) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } - Assert.assertNotNull(containerManager.amTokens); + Assert.assertNotNull(containerManager.containerTokens); RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); @@ -229,17 +249,19 @@ public ApplicationMasterProtocol run() { } catch (Exception e) { // Because there are no tokens, the request should be rejected as the // server side will assume we are trying simple auth. + String availableAuthMethods; + if (UserGroupInformation.isSecurityEnabled()) { + availableAuthMethods = "[TOKEN, KERBEROS]"; + } else { + availableAuthMethods = "[TOKEN]"; + } Assert.assertTrue(e.getCause().getMessage().contains( "SIMPLE authentication is not enabled. " - + "Available:[TOKEN, KERBEROS]")); + + "Available:" + availableAuthMethods)); } // Now try to validate invalid authorization. - Credentials credentials = new Credentials(); - DataInputByteBuffer buf = new DataInputByteBuffer(); - containerManager.amTokens.rewind(); - buf.reset(containerManager.amTokens); - credentials.readTokenStorageStream(buf); + Credentials credentials = containerManager.getContainerCredentials(); currentUser.addCredentials(credentials); // Create a client to the RM. @@ -252,7 +274,8 @@ public ApplicationMasterProtocol run() { } }); - request = Records.newRecord(RegisterApplicationMasterRequest.class); + request = + Records.newRecord(RegisterApplicationMasterRequest.class); ApplicationAttemptId otherAppAttemptId = BuilderUtils .newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42); request.setApplicationAttemptId(otherAppAttemptId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index eb234b2..f4d3901 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -18,17 +18,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; +import java.security.PrivilegedExceptionAction; import java.util.List; import junit.framework.Assert; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -87,6 +92,22 @@ private void syncNodeLost(MockNM nm) throws Exception { dispatcher.await(); } + private AllocateResponse allocate(final AllocateRequest req) throws Exception { + ApplicationAttemptId attemptId = req.getApplicationAttemptId(); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + rm.getRMContext().getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amService.allocate(req); + } + }); + } + @Test public void testAMRMUnusableNodes() throws Exception { @@ -109,7 +130,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns no updated node AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1 .getAppAttemptId(), 0, 0F, null, null, null); - AllocateResponse response1 = amService.allocate(allocateRequest1); + AllocateResponse response1 = allocate(allocateRequest1); List updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); @@ -118,7 +139,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns updated node allocateRequest1 = AllocateRequest.newInstance(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); - response1 = amService.allocate(allocateRequest1); + response1 = allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); NodeReport nr = updatedNodes.iterator().next(); @@ -126,7 +147,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); // resending the allocate request returns the same result - response1 = amService.allocate(allocateRequest1); + response1 = allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -138,7 +159,7 @@ public void testAMRMUnusableNodes() throws Exception { // subsequent allocate request returns delta allocateRequest1 = AllocateRequest.newInstance(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); - response1 = amService.allocate(allocateRequest1); + response1 = allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -158,7 +179,7 @@ public void testAMRMUnusableNodes() throws Exception { // allocate request returns no updated node AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2 .getAppAttemptId(), 0, 0F, null, null, null); - AllocateResponse response2 = amService.allocate(allocateRequest2); + AllocateResponse response2 = allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); @@ -167,7 +188,7 @@ public void testAMRMUnusableNodes() throws Exception { // both AM's should get delta updated nodes allocateRequest1 = AllocateRequest.newInstance(attempt1 .getAppAttemptId(), response1.getResponseId(), 0F, null, null, null); - response1 = amService.allocate(allocateRequest1); + response1 = allocate(allocateRequest1); updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -176,7 +197,7 @@ public void testAMRMUnusableNodes() throws Exception { allocateRequest2 = AllocateRequest.newInstance(attempt2 .getAppAttemptId(), response2.getResponseId(), 0F, null, null, null); - response2 = amService.allocate(allocateRequest2); + response2 = allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); nr = updatedNodes.iterator().next(); @@ -186,7 +207,7 @@ public void testAMRMUnusableNodes() throws Exception { // subsequent allocate calls should return no updated nodes allocateRequest2 = AllocateRequest.newInstance(attempt2 .getAppAttemptId(), response2.getResponseId(), 0F, null, null, null); - response2 = amService.allocate(allocateRequest2); + response2 = allocate(allocateRequest2); updatedNodes = response2.getUpdatedNodes(); Assert.assertEquals(0, updatedNodes.size()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 4e309fd..860881a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -18,19 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; +import java.security.PrivilegedExceptionAction; + import junit.framework.Assert; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMCommand; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; -import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.junit.After; @@ -39,20 +40,13 @@ public class TestAMRMRPCResponseId { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - private MockRM rm; ApplicationMasterService amService = null; - private ClientRMService clientService; - - private RMContext context; @Before public void setUp() { this.rm = new MockRM(); rm.start(); - this.clientService = rm.getClientRMService(); amService = rm.getApplicationMasterService(); } @@ -63,6 +57,22 @@ public void tearDown() { } } + private AllocateResponse allocate(final AllocateRequest req) throws Exception { + ApplicationAttemptId attemptId = req.getApplicationAttemptId(); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + org.apache.hadoop.security.token.Token token = + rm.getRMContext().getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amService.allocate(req); + } + }); + } + @Test public void testARRMResponseId() throws Exception { @@ -81,22 +91,22 @@ public void testARRMResponseId() throws Exception { AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt .getAppAttemptId(), 0, 0F, null, null, null); - AllocateResponse response = amService.allocate(allocateRequest); + AllocateResponse response = allocate(allocateRequest); Assert.assertEquals(1, response.getResponseId()); Assert.assertTrue(response.getAMCommand() == null); allocateRequest = AllocateRequest.newInstance(attempt .getAppAttemptId(), response.getResponseId(), 0F, null, null, null); - response = amService.allocate(allocateRequest); + response = allocate(allocateRequest); Assert.assertEquals(2, response.getResponseId()); /* try resending */ - response = amService.allocate(allocateRequest); + response = allocate(allocateRequest); Assert.assertEquals(2, response.getResponseId()); /** try sending old request again **/ allocateRequest = AllocateRequest.newInstance(attempt .getAppAttemptId(), 0, 0F, null, null, null); - response = amService.allocate(allocateRequest); + response = allocate(allocateRequest); Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 9a3358e..e8a65f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -296,7 +297,8 @@ public void testValidateResourceBlacklistRequest() throws Exception { UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); - + Credentials credentials = containerManager.getContainerCredentials(); + currentUser.addCredentials(credentials); ApplicationMasterProtocol client = currentUser .doAs(new PrivilegedAction() { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b39aaec..3829d66 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collection; import javax.crypto.SecretKey; @@ -26,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -46,17 +47,29 @@ import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class TestAMRMTokens { private static final Log LOG = LogFactory.getLog(TestAMRMTokens.class); - private static final Configuration confWithSecurityEnabled = - new Configuration(); - static { - confWithSecurityEnabled.set( + private final Configuration conf; + + @Parameters + public static Collection configs() { + Configuration conf = new Configuration(); + Configuration confWithSecurity = new Configuration(); + confWithSecurity.set( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(confWithSecurityEnabled); + return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity } }); + } + + public TestAMRMTokens(Configuration conf) { + this.conf = conf; + UserGroupInformation.setConfiguration(conf); } /** @@ -70,7 +83,7 @@ public void testTokenExpiry() throws Exception { MyContainerManager containerManager = new MyContainerManager(); final MockRM rm = - new MockRMWithAMS(confWithSecurityEnabled, containerManager); + new MockRMWithAMS(conf, containerManager); rm.start(); final Configuration conf = rm.getConfig(); @@ -85,11 +98,11 @@ public void testTokenExpiry() throws Exception { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.amTokens == null && waitCount++ < 20) { + while (containerManager.containerTokens == null && waitCount++ < 20) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } - Assert.assertNotNull(containerManager.amTokens); + Assert.assertNotNull(containerManager.containerTokens); RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); @@ -98,11 +111,7 @@ public void testTokenExpiry() throws Exception { UserGroupInformation currentUser = UserGroupInformation .createRemoteUser(applicationAttemptId.toString()); - Credentials credentials = new Credentials(); - DataInputByteBuffer buf = new DataInputByteBuffer(); - containerManager.amTokens.rewind(); - buf.reset(containerManager.amTokens); - credentials.readTokenStorageStream(buf); + Credentials credentials = containerManager.getContainerCredentials(); currentUser.addCredentials(credentials); rmClient = createRMClient(rm, conf, rpc, currentUser); @@ -162,7 +171,7 @@ public void testMasterKeyRollOver() throws Exception { MyContainerManager containerManager = new MyContainerManager(); final MockRM rm = - new MockRMWithAMS(confWithSecurityEnabled, containerManager); + new MockRMWithAMS(conf, containerManager); rm.start(); final Configuration conf = rm.getConfig(); @@ -177,11 +186,11 @@ public void testMasterKeyRollOver() throws Exception { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.amTokens == null && waitCount++ < 20) { + while (containerManager.containerTokens == null && waitCount++ < 20) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } - Assert.assertNotNull(containerManager.amTokens); + Assert.assertNotNull(containerManager.containerTokens); RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); @@ -190,11 +199,7 @@ public void testMasterKeyRollOver() throws Exception { UserGroupInformation currentUser = UserGroupInformation .createRemoteUser(applicationAttemptId.toString()); - Credentials credentials = new Credentials(); - DataInputByteBuffer buf = new DataInputByteBuffer(); - containerManager.amTokens.rewind(); - buf.reset(containerManager.amTokens); - credentials.readTokenStorageStream(buf); + Credentials credentials = containerManager.getContainerCredentials(); currentUser.addCredentials(credentials); rmClient = createRMClient(rm, conf, rpc, currentUser);