diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 0e27f32..d1b1735 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -71,7 +72,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens) { + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -82,6 +83,7 @@ public static AllocateResponse newInstance(int responseId, response.setAMCommand(command); response.setPreemptionMessage(preempt); response.setNMTokens(nmTokens); + response.setAMRMToken(amRMToken); return response; } @@ -91,12 +93,12 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens, + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken, List increasedContainers, List decreasedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, - numClusterNodes, preempt, nmTokens); + numClusterNodes, preempt, nmTokens, amRMToken); response.setIncreasedContainers(increasedContainers); response.setDecreasedContainers(decreasedContainers); return response; @@ -270,4 +272,16 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setDecreasedContainers( List decreasedContainers); + + /** + * The AMRMToken that belong to this attempt + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 79f9f3a..2a65033 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -57,7 +57,7 @@ public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, List containersFromPreviousAttempt, String queue, - List nmTokensFromPreviousAttempts) { + List nmTokensFromPreviousAttempts, ByteBuffer amRMToken) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); @@ -66,6 +66,7 @@ public static RegisterApplicationMasterResponse newInstance( response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); + response.setAMRMToken(amRMToken); return response; } @@ -180,4 +181,16 @@ public abstract void setContainersFromPreviousAttempts( @Private @Unstable public abstract void setNMTokensFromPreviousAttempts(List nmTokens); + + /** + * The AMRMToken that belong to this attempt + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..d16fa47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto { repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + optional bytes am_rm_token = 7; } message FinishApplicationMasterRequestProto { @@ -85,6 +86,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional bytes am_rm_token = 12; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1eebaac..6431b60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,7 +40,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -61,6 +66,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -200,6 +206,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } return response; } @@ -258,6 +267,9 @@ public AllocateResponse allocate(float progressIndicator) if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -302,6 +314,23 @@ protected void populateNMTokens(List nmTokens) { } } + private void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + if (UserGroupInformation.isSecurityEnabled()) { + currentUGI = UserGroupInformation.getLoginUser(); + } + currentUGI.addToken(amrmToken); + LOG.info("Update AMRMToken: " + amrmToken.toString()); + } + @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 15bfa28..340cbf1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; @@ -766,7 +765,7 @@ public AllocateResponse createFakeAllocateResponse() { new ArrayList(), new ArrayList(), new ArrayList(), Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, - null, new ArrayList()); + null, new ArrayList(), null); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index e21c4ba..9467bc8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -36,7 +37,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -76,11 +76,12 @@ public void testAMRMClientAsync() throws Exception { List allocated1 = Arrays.asList( Container.newInstance(null, null, null, null, null, null)); final AllocateResponse response1 = createAllocateResponse( - new ArrayList(), allocated1, null); + new ArrayList(), allocated1, null, null); final AllocateResponse response2 = createAllocateResponse(completed1, - new ArrayList(), null); + new ArrayList(), null, null); final AllocateResponse emptyResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); + new ArrayList(), new ArrayList(), null + , null); TestCallbackHandler callbackHandler = new TestCallbackHandler(); final AMRMClient client = mock(AMRMClientImpl.class); @@ -211,7 +212,8 @@ public void testAMRMClientAsyncReboot() throws Exception { AMRMClient client = mock(AMRMClientImpl.class); final AllocateResponse rebootResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); + new ArrayList(), new ArrayList(), null + , null); rebootResponse.setAMCommand(AMCommand.AM_RESYNC); when(client.allocate(anyFloat())).thenReturn(rebootResponse); @@ -244,7 +246,8 @@ public void testAMRMClientAsyncShutDown() throws Exception { AMRMClient client = mock(AMRMClientImpl.class); final AllocateResponse shutDownResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); + new ArrayList(), new ArrayList(), null + , null); shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); when(client.allocate(anyFloat())).thenReturn(shutDownResponse); @@ -273,7 +276,7 @@ public void testCallAMRMClientAsyncStopFromCallbackHandler() ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); final AllocateResponse response = createAllocateResponse(completed, - new ArrayList(), null); + new ArrayList(), null, null); when(client.allocate(anyFloat())).thenReturn(response); @@ -305,7 +308,7 @@ void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); final AllocateResponse response = createAllocateResponse(completed, - new ArrayList(), null); + new ArrayList(), null, null); when(client.allocate(anyFloat())).thenReturn(response); AMRMClientAsync asyncClient = @@ -356,10 +359,11 @@ public void testCallBackThrowOutExceptionNoStop() throws YarnException, private AllocateResponse createAllocateResponse( List completed, List allocated, - List nmTokens) { + List nmTokens, ByteBuffer amRMToken) { AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null, nmTokens); + new ArrayList(), null, null, 1, null, nmTokens, + amRMToken); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 4d7c0a3..c7a745d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @Private @@ -74,7 +76,7 @@ private List updatedNodes = null; private PreemptionMessage preempt; - + private ByteBuffer amRMToken = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -154,6 +156,9 @@ private synchronized void mergeLocalToBuilder() { getChangeProtoIterable(this.decreasedContainers); builder.addAllDecreasedContainers(iterable); } + if(this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } private synchronized void mergeLocalToProto() { @@ -357,6 +362,28 @@ public synchronized void setDecreasedContainers( this.decreasedContainers.addAll(decreasedContainers); } + @Override + public synchronized ByteBuffer getAMRMToken() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if(amRMToken != null) { + return amRMToken; + } + if(!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public synchronized void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if(amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { return; @@ -699,4 +726,14 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) { private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private synchronized ByteBuffer convertFromProtoFormat( + ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private synchronized ByteString convertToProtoFormat( + ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 06a637a..548215c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -61,6 +61,7 @@ private Map applicationACLS = null; private List containersFromPreviousAttempts = null; private List nmTokens = null; + private ByteBuffer amRMToken = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -122,6 +123,9 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if(this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } @@ -326,6 +330,29 @@ public void setNMTokensFromPreviousAttempts(final List nmTokens) { this.nmTokens.addAll(nmTokens); } + @Override + public synchronized ByteBuffer getAMRMToken() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public synchronized void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if(amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalNewNMTokenList() { RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getNmTokensFromPreviousAttemptsList(); @@ -387,4 +414,14 @@ private NMTokenProto convertToProtoFormat(NMToken token) { private NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private synchronized ByteBuffer convertFromProtoFormat( + ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private synchronized ByteString convertToProtoFormat( + ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java index 99495d7..dfab838 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java @@ -44,6 +44,7 @@ public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); private ApplicationAttemptId applicationAttemptId; + private int keyId; public AMRMTokenIdentifier() { } @@ -51,6 +52,13 @@ public AMRMTokenIdentifier() { public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) { this(); this.applicationAttemptId = appAttemptId; + this.keyId = -1; + } + + public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, int masterKeyId) { + this(); + this.applicationAttemptId = appAttemptId; + this.keyId = masterKeyId; } @Private @@ -64,6 +72,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(appId.getClusterTimestamp()); out.writeInt(appId.getId()); out.writeInt(this.applicationAttemptId.getAttemptId()); + out.writeInt(this.keyId); } @Override @@ -75,6 +84,7 @@ public void readFields(DataInput in) throws IOException { ApplicationId.newInstance(clusterTimeStamp, appId); this.applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, attemptId); + this.keyId = in.readInt(); } @Override @@ -92,6 +102,10 @@ public UserGroupInformation getUser() { .toString()); } + public int getKeyId() { + return keyId; + } + // TODO: Needed? @InterfaceAudience.Private public static class Renewer extends Token.TrivialRenewer { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index a8a56d7..16e1abb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -72,7 +72,7 @@ public void testAllocateResponseWithIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, incContainers, decContainers); // serde @@ -100,8 +100,9 @@ public void testAllocateResponseWithIncDecContainers() { public void testAllocateResponseWithoutIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), - new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null); + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null, + null); // serde AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..a911c3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -35,10 +36,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -82,12 +86,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateAMRMTokenEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -288,6 +294,27 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } + //update AMRMToken and save into RMStateStore if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKey(); + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != lock + .getPreviousAMRMTokenKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAMRMToken(applicationAttemptId); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + response.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppAttemptUpdateAMRMTokenEvent(applicationAttemptId, amrmToken)); + lock.setPreviousAMRMTokenKeyId(nextMasterKey.getMasterKey().getKeyId()); + } + // For work-preserving AM restart, retrieve previous attempts' containers // and corresponding NM tokens. List transferredContainers = @@ -400,7 +427,6 @@ public boolean hasApplicationMasterRegistered( @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - ApplicationAttemptId appAttemptId = authorizeRequest(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -438,7 +464,6 @@ public AllocateResponse allocate(AllocateRequest request) // get an exception. Might as well throw an exception here. return resync; } - //filter illegal progress values float filteredProgress = request.getProgress(); if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY @@ -549,6 +574,27 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + //update AMRMToken and save into RMStateStore if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKey(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != lock + .getPreviousAMRMTokenKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAMRMToken(appAttemptId); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + allocateResponse.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppAttemptUpdateAMRMTokenEvent(appAttemptId, amrmToken)); + lock.setPreviousAMRMTokenKeyId(nextMasterKey.getMasterKey().getKeyId()); + } /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which @@ -644,6 +690,8 @@ protected void serviceStop() throws Exception { public static class AllocateResponseLock { private AllocateResponse response; + private int previousAMRMTokenKeyId = -1; + public AllocateResponseLock(AllocateResponse response) { this.response = response; @@ -656,6 +704,15 @@ public synchronized AllocateResponse getAllocateResponse() { public synchronized void setAllocateResponse(AllocateResponse response) { this.response = response; } + + public synchronized int getPreviousAMRMTokenKeyId() { + return previousAMRMTokenKeyId; + } + + public synchronized void setPreviousAMRMTokenKeyId( + int previousAMRMTokenKeyId) { + this.previousAMRMTokenKeyId = previousAMRMTokenKeyId; + } } @VisibleForTesting diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index fc4537c..fca4761 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -421,6 +421,13 @@ public synchronized void updateApplicationAttemptState( new RMStateUpdateAppAttemptEvent(attemptState)); } + @SuppressWarnings("unchecked") + public synchronized void updateApplicationAttemptState( + ApplicationAttemptState attemptState, boolean onlyUpdateAMRMToken) { + dispatcher.getEventHandler().handle( + new RMStateUpdateAppAttemptEvent(attemptState, onlyUpdateAMRMToken)); + } + /** * Blocking API * Derived classes must implement this method to store the state of an @@ -674,8 +681,10 @@ protected void handleStoreEvent(RMStateStoreEvent event) { RMStateStoreEventType.UPDATE_APP_ATTEMPT); updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + if (! ((RMStateUpdateAppAttemptEvent)event).onlyUpdateAMRMToken()) { + notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), storedException); + } } } catch (Exception e) { LOG.error( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 9ded673..5dd9301 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -23,13 +23,26 @@ public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent { ApplicationAttemptState attemptState; + boolean onlyUpdateAMRMToken; public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) { super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); this.attemptState = attemptState; + this.onlyUpdateAMRMToken = false; + } + + public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState, + boolean onlyUpdateAMRMToken) { + super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); + this.attemptState = attemptState; + this.onlyUpdateAMRMToken = onlyUpdateAMRMToken; } public ApplicationAttemptState getAppAttemptState() { return attemptState; } + + public boolean onlyUpdateAMRMToken() { + return onlyUpdateAMRMToken; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index e1522f1..9692cbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -34,6 +34,7 @@ REGISTERED, STATUS_UPDATE, UNREGISTERED, + AMRMTOKEN_UPDATE, // Source: Containers CONTAINER_ACQUIRED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..23e4014 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -36,6 +36,7 @@ import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -88,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateAMRMTokenEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -313,6 +315,9 @@ RMAppAttemptEventType.KILL, new FinalSavingTransition(new FinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, + RMAppAttemptEventType.AMRMTOKEN_UPDATE, + new AMRMTokenUpdateTransition()) // Transitions from FINAL_SAVING State .addTransition(RMAppAttemptState.FINAL_SAVING, @@ -327,6 +332,9 @@ RMAppAttemptEventType.EXPIRE, new AMExpiredAtFinalSavingTransition()) .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING, + RMAppAttemptEventType.AMRMTOKEN_UPDATE, + new AMRMTokenUpdateTransition()) + .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING, EnumSet.of( RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.STATUS_UPDATE, @@ -352,7 +360,8 @@ RMAppAttemptEventType.KILL, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.STATUS_UPDATE, - RMAppAttemptEventType.CONTAINER_ALLOCATED)) + RMAppAttemptEventType.CONTAINER_ALLOCATED, + RMAppAttemptEventType.AMRMTOKEN_UPDATE)) // Transitions from FINISHING State .addTransition(RMAppAttemptState.FINISHING, @@ -362,6 +371,10 @@ .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED, RMAppAttemptEventType.EXPIRE, new FinalTransition(RMAppAttemptState.FINISHED)) + .addTransition(RMAppAttemptState.FINISHING, + RMAppAttemptState.FINISHING, + RMAppAttemptEventType.AMRMTOKEN_UPDATE, + new AMRMTokenUpdateTransition()) .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING, EnumSet.of( RMAppAttemptEventType.UNREGISTERED, @@ -380,7 +393,8 @@ RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.CONTAINER_ALLOCATED, RMAppAttemptEventType.CONTAINER_FINISHED, - RMAppAttemptEventType.KILL)) + RMAppAttemptEventType.KILL, + RMAppAttemptEventType.AMRMTOKEN_UPDATE)) // Transitions from KILLED State .addTransition( @@ -396,7 +410,8 @@ RMAppAttemptEventType.CONTAINER_FINISHED, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, - RMAppAttemptEventType.STATUS_UPDATE)) + RMAppAttemptEventType.STATUS_UPDATE, + RMAppAttemptEventType.AMRMTOKEN_UPDATE)) .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -545,7 +560,12 @@ public SecretKey getClientTokenMasterKey() { @Override public Token getAMRMToken() { - return this.amrmToken; + try { + this.readLock.lock(); + return this.amrmToken; + } finally { + this.readLock.unlock(); + } } @Override @@ -761,11 +781,9 @@ public void transition(RMAppAttemptImpl appAttempt, } // create AMRMToken - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); appAttempt.amrmToken = - new Token(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); + appAttempt.rmContext.getAMRMTokenSecretManager().createAMRMToken( + appAttempt.applicationAttemptId); // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. @@ -1565,6 +1583,25 @@ public AMFinishedAfterFinalSavingTransition( } } + private static final class AMRMTokenUpdateTransition extends BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + RMAppAttemptUpdateAMRMTokenEvent amRMTokenUpdateEvent = + (RMAppAttemptUpdateAMRMTokenEvent) event; + appAttempt.updateAMRMToken(amRMTokenUpdateEvent.getAmrmToken()); + RMStateStore rmStore = appAttempt.rmContext.getStateStore(); + Credentials credentials = rmStore.getCredentialsFromAppAttempt(appAttempt); + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials, + appAttempt.getStartTime()); + LOG.info("Updating application attempt " + appAttempt.getAppAttemptId() + + " with updated AMRMToken"); + rmStore.updateApplicationAttemptState(attemptState, true); + } + } + @Override public long getStartTime() { this.readLock.lock(); @@ -1663,4 +1700,13 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } return attemptReport; } + + private void updateAMRMToken(Token amrmToken) { + try { + this.writeLock.lock(); + this.amrmToken = amrmToken; + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java new file mode 100644 index 0000000..f988b98 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; + +public class RMAppAttemptUpdateAMRMTokenEvent extends RMAppAttemptEvent { + + private final Token amrmToken; + + public RMAppAttemptUpdateAMRMTokenEvent(ApplicationAttemptId appAttemptId, + Token amrmToken) { + super(appAttemptId, RMAppAttemptEventType.AMRMTOKEN_UPDATE); + this.amrmToken = amrmToken; + } + + public Token getAmrmToken() { + return amrmToken; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 5d21ec0..6a6841c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -19,22 +19,26 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; +import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; - -import javax.crypto.SecretKey; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their @@ -49,12 +53,20 @@ private static final Log LOG = LogFactory .getLog(AMRMTokenSecretManager.class); - private SecretKey masterKey; + private int serialNo = new SecureRandom().nextInt(); + + protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + protected final Lock readLock = readWriteLock.readLock(); + protected final Lock writeLock = readWriteLock.writeLock(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + private final Timer timer; private final long rollingInterval; + private final long activationDelay; - private final Map passwords = - new HashMap(); + private final Map passwords = + new HashMap(); /** * Create an {@link AMRMTokenSecretManager} @@ -67,6 +79,21 @@ public AMRMTokenSecretManager(Configuration conf) { .getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + + "ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 2 X " + + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS); + } } public void start() { @@ -77,12 +104,17 @@ public void stop() { this.timer.cancel(); } - public synchronized void applicationMasterFinished( - ApplicationAttemptId appAttemptId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Application finished, removing password for " + appAttemptId); + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + try { + this.writeLock.lock(); + if (LOG.isDebugEnabled()) { + LOG + .debug("Application finished, removing password for " + appAttemptId); + } + this.passwords.remove(appAttemptId); + } finally { + this.writeLock.unlock(); } - this.passwords.remove(appAttemptId); } private class MasterKeyRoller extends TimerTask { @@ -93,19 +125,55 @@ public void run() { } @Private - public synchronized void setMasterKey(SecretKey masterKey) { - this.masterKey = masterKey; + public void rollMasterKey() { + try { + this.writeLock.lock(); + LOG.info("Rolling master-key for AMRM-tokens"); + if (this.currentMasterKey == null) { // Setting up for the first time. + this.currentMasterKey = createNewMasterKey(); + } else { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + LOG.info("Going to activate master-key with key-id " + + this.nextMasterKey.getMasterKey().getKeyId() + " in " + + this.activationDelay + "ms"); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } + } finally { + this.writeLock.unlock(); + } } - @Private - public synchronized SecretKey getMasterKey() { - return this.masterKey; + private MasterKeyData createNewMasterKey() { + try { + this.writeLock.lock(); + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } } + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + /** + * Activate the new master-key + */ @Private - synchronized void rollMasterKey() { - LOG.info("Rolling master-key for amrm-tokens"); - this.masterKey = generateSecret(); + public void activateNextMasterKey() { + try { + this.writeLock.lock(); + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } finally { + this.writeLock.unlock(); + } } /** @@ -113,29 +181,41 @@ synchronized void rollMasterKey() { * send to the AppicationAttempt which can give it back during authentication. */ @Override - public synchronized byte[] createPassword( - AMRMTokenIdentifier identifier) { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating password for " + applicationAttemptId); + public byte[] createPassword(AMRMTokenIdentifier identifier) { + try { + this.readLock.lock(); + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating password for " + applicationAttemptId); + } + byte[] password = + createPassword(identifier.getBytes(), getMasterKey().getSecretKey()); + this.passwords.put(identifier.getApplicationAttemptId(), + new PasswordInfo(password, identifier.getKeyId())); + return password; + } finally { + this.readLock.unlock(); } - byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); - return password; } /** * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. */ - public synchronized void - addPersistedPassword(Token token) throws IOException { - AMRMTokenIdentifier identifier = token.decodeIdentifier(); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + public void addPersistedPassword(Token token) + throws IOException { + try { + this.writeLock.lock(); + AMRMTokenIdentifier identifier = token.decodeIdentifier(); + if (LOG.isDebugEnabled()) { + LOG + .debug("Adding password for " + identifier.getApplicationAttemptId()); + } + this.passwords.put(identifier.getApplicationAttemptId(), + new PasswordInfo(token.getPassword(), identifier.getKeyId())); + } finally { + this.writeLock.unlock(); } - this.passwords.put(identifier.getApplicationAttemptId(), - token.getPassword()); } /** @@ -143,19 +223,45 @@ synchronized void rollMasterKey() { * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. */ @Override - public synchronized byte[] retrievePassword( - AMRMTokenIdentifier identifier) throws InvalidToken { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to retrieve password for " + applicationAttemptId); + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + try { + this.readLock.lock(); + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + applicationAttemptId); + } + PasswordInfo password = this.passwords.get(applicationAttemptId); + if (password == null) { + throw new InvalidToken("Password not found for ApplicationAttempt " + + applicationAttemptId); + } + if (password.getKeyId() == identifier.getKeyId()) { + return password.getPassword(); + } else if (password.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + byte[] pwd = + createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + this.passwords.put(applicationAttemptId, new PasswordInfo(pwd, + identifier.getKeyId())); + return pwd; + } else if (password.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + byte[] pwd = + createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + this.passwords.put(applicationAttemptId, new PasswordInfo(pwd, + identifier.getKeyId())); + return pwd; + } + throw new InvalidToken("Given AMRMToken for application : " + + applicationAttemptId.toString() + + " seems to have been generated illegally."); + } finally { + this.readLock.unlock(); } - byte[] password = this.passwords.get(applicationAttemptId); - if (password == null) { - throw new InvalidToken("Password not found for ApplicationAttempt " - + applicationAttemptId); - } - return password; } /** @@ -167,4 +273,66 @@ public AMRMTokenIdentifier createIdentifier() { return new AMRMTokenIdentifier(); } + public Token createAMRMToken( + ApplicationAttemptId appAttemptId) { + try { + this.readLock.lock(); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey() + .getKeyId()); + byte[] password = this.createPassword(identifier); + LOG.info("Create a AMRMToken for: " + identifier + ", with the password: " + password.toString()); + return new Token(identifier.getBytes(), password, + identifier.getKind(), new Text()); + } finally { + this.readLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey + @Private + public MasterKeyData getMasterKey() { + try { + this.readLock.lock(); + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + public MasterKeyData getNextMasterKey() { + try { + this.readLock.lock(); + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + public MasterKeyData getCurrentMasterKey() { + try { + this.readLock.lock(); + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + + private class PasswordInfo { + private final byte[] password; + private final int keyId; + public PasswordInfo(byte[] password, int keyId) { + this.password = password; + this.keyId = keyId; + } + public byte[] getPassword() { + return password; + } + public int getKeyId() { + return keyId; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java index 0ea2b5e..290f805 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; public class MockRMWithCustomAMLauncher extends MockRM { @@ -46,6 +47,12 @@ public MockRMWithCustomAMLauncher(Configuration conf, this.containerManager = containerManager; } + public MockRMWithCustomAMLauncher(Configuration conf, + ContainerManagementProtocol containerManager, RMStateStore store) { + super(conf, store); + this.containerManager = containerManager; + } + @Override protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(getRMContext()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index c7f0d0a..8e2d0c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -138,6 +139,11 @@ public MockRMWithAMS(Configuration conf, ContainerManagementProtocol containerMa super(conf, containerManager); } + public MockRMWithAMS(Configuration conf, + ContainerManagementProtocol containerManager, RMStateStore store) { + super(conf, containerManager, store); + } + @Override protected void doSecureLogin() throws IOException { // Skip the login. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 49eff8b..d877965 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1613,8 +1613,8 @@ public void testClientRetryOnKillingApplication() throws Exception { rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); - Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt); - Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); + Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateAttempt); + Assert.assertEquals(3, ((TestMemoryRMStateStore) memStore).updateApp); } @SuppressWarnings("resource") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index 64602bd..be39085 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -22,6 +22,7 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; +import java.util.Map; import javax.crypto.SecretKey; @@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -38,14 +40,22 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -198,12 +208,22 @@ public void testTokenExpiry() throws Exception { * * @throws Exception */ + @SuppressWarnings("unchecked") @Test public void testMasterKeyRollOver() throws Exception { + this.conf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); MyContainerManager containerManager = new MyContainerManager(); final MockRMWithAMS rm = - new MockRMWithAMS(conf, containerManager); + new MockRMWithAMS(conf, containerManager, memStore); rm.start(); final Configuration conf = rm.getConfig(); @@ -215,6 +235,10 @@ public void testMasterKeyRollOver() throws Exception { RMApp app = rm.submitApp(1024); + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + nm1.nodeHeartbeat(true); int waitCount = 0; @@ -227,6 +251,10 @@ public void testMasterKeyRollOver() throws Exception { RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(applicationAttemptId); + Assert.assertNotNull(attemptState); + // Create a client to the RM. UserGroupInformation currentUser = UserGroupInformation @@ -253,16 +281,64 @@ public void testMasterKeyRollOver() throws Exception { // Simulate a master-key-roll-over AMRMTokenSecretManager appTokenSecretManager = rm.getRMContext().getAMRMTokenSecretManager(); - SecretKey oldKey = appTokenSecretManager.getMasterKey(); + SecretKey oldKey = + appTokenSecretManager.getCurrentMasterKey().getSecretKey(); + + // Manually call rollMasterKey. It will create a newKey + // which will be assigned to nextMasterKey appTokenSecretManager.rollMasterKey(); - SecretKey newKey = appTokenSecretManager.getMasterKey(); + SecretKey newKey = + appTokenSecretManager.getNextMasterKey().getSecretKey(); + Assert.assertTrue(newKey != null); + + // Do another allocate call, the new AMRMToken should be + // saved in RMStateStore + allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); + rmClient.allocate(allocateRequest); + Token recoveredToken = null; + + int count = 0; + while (count < 10) { + recoveredToken = + (Token) rmAppState.get(app.getApplicationId()) + .getAttempt(applicationAttemptId).getAppAttemptCredentials() + .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); + if (recoveredToken.decodeIdentifier().getKeyId() + == appTokenSecretManager.getNextMasterKey().getMasterKey() + .getKeyId()) { + break; + } + count++; + Thread.sleep(200); + } + + Assert.assertTrue(recoveredToken.decodeIdentifier().getKeyId() + == appTokenSecretManager.getNextMasterKey().getMasterKey().getKeyId()); + // Manually call activateNextMasterKey. Will replace + // currentMasterKey with nextMasterKey + appTokenSecretManager.activateNextMasterKey(); + SecretKey currentKeyAfterActive = + appTokenSecretManager.getCurrentMasterKey().getSecretKey(); Assert.assertFalse("Master key should have changed!", - oldKey.equals(newKey)); + oldKey.equals(currentKeyAfterActive)); + Assert.assertTrue(newKey.equals(currentKeyAfterActive)); + + // renew the token for currentUser + Token newAMRMToken = + appTokenSecretManager.createAMRMToken(applicationAttemptId); + SecurityUtil.setTokenService(newAMRMToken, rmBindAddress); + currentUser.addToken(newAMRMToken); + for (Token token : currentUser.getTokens()) { + if (token.decodeIdentifier() instanceof AMRMTokenIdentifier) { + Assert.assertFalse(token.getPassword() + .equals(amRMToken.getPassword())); + } + } // Another allocate call. Should continue to work. rpc.stopProxy(rmClient, conf); // To avoid using cached client rmClient = createRMClient(rm, conf, rpc, currentUser); - allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); Assert.assertTrue( rmClient.allocate(allocateRequest).getAMCommand() == null); } finally {