diff --git hadoop-common-project/hadoop-common/src/main/proto/Security.proto hadoop-common-project/hadoop-common/src/main/proto/Security.proto index 5ff571d..ba23c36 100644 --- hadoop-common-project/hadoop-common/src/main/proto/Security.proto +++ hadoop-common-project/hadoop-common/src/main/proto/Security.proto @@ -44,6 +44,7 @@ message GetDelegationTokenRequestProto { message GetDelegationTokenResponseProto { optional hadoop.common.TokenProto token = 1; + optional bool is_obtained = 2 [default = false]; } message RenewDelegationTokenRequestProto { @@ -51,13 +52,15 @@ message RenewDelegationTokenRequestProto { } message RenewDelegationTokenResponseProto { - required uint64 newExpiryTime = 1; + optional uint64 newExpiryTime = 1; + optional bool is_renewed = 2 [default = false]; } message CancelDelegationTokenRequestProto { required hadoop.common.TokenProto token = 1; } -message CancelDelegationTokenResponseProto { // void response +message CancelDelegationTokenResponseProto { + optional bool is_canceled = 1 [default = false]; } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java index aeb20cd..eec4537 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java @@ -104,6 +104,7 @@ public void testClusterGetDelegationToken() throws Exception { rmDTToken.setKind("Testclusterkind"); rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); rmDTToken.setService("0.0.0.0:8032"); + getDTResponse.setIsObtained(true); getDTResponse.setRMDelegationToken(rmDTToken); final ApplicationClientProtocol cRMProtocol = mock(ApplicationClientProtocol.class); when(cRMProtocol.getDelegationToken(any( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 0abaafb..7049415 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -298,13 +298,22 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls( /** *

The interface used by clients to get delegation token, enabling the - * containers to be able to talk to the service using those tokens. - * - *

The ResourceManager responds with the delegation - * {@link Token} that can be used by the client to speak to this - * service. + * containers to be able to talk to the service using those tokens.

+ * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to get the + * {@link Token}. + *

* @param request request to get a delegation token for the client. - * @return delegation token that can be used to talk to this service + * @return the response including a flag and a delegation token that can be + * used to talk to this service * @throws YarnException * @throws IOException */ @@ -316,9 +325,20 @@ public GetDelegationTokenResponse getDelegationToken( /** * Renew an existing delegation {@link Token}. - * + * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to get new expiry + * time. + *

* @param request the delegation token to be renewed. - * @return the new expiry time for the delegation token. + * @return the response including a flag and a new expiry time for the + * delegation token. * @throws YarnException * @throws IOException */ @@ -330,9 +350,18 @@ public RenewDelegationTokenResponse renewDelegationToken( /** * Cancel an existing delegation {@link Token}. - * + * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to confirm + * the delegation token is canceled. + *

* @param request the delegation token to be cancelled. - * @return an empty response. + * @return the response including a flag. * @throws YarnException * @throws IOException */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java index 495d03e..d605c60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -31,9 +33,27 @@ public abstract class CancelDelegationTokenResponse { @Private @Unstable - public static CancelDelegationTokenResponse newInstance() { + public static CancelDelegationTokenResponse newInstance(boolean isCanceled) { CancelDelegationTokenResponse response = Records.newRecord(CancelDelegationTokenResponse.class); + response.setIsCanceled(isCanceled); return response; } + + /** + * Get the flag which indicates that the process of canceling delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsCanceled(); + + /** + * Set the flag which indicates that the process of canceling delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsCanceled(boolean isCanceled); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java index bb7fcf0..9613619 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java @@ -39,14 +39,32 @@ @Private @Unstable - public static GetDelegationTokenResponse newInstance(Token rmDTToken) { + public static GetDelegationTokenResponse newInstance( + boolean isObtained, Token rmDTToken) { GetDelegationTokenResponse response = Records.newRecord(GetDelegationTokenResponse.class); + response.setIsObtained(isObtained); response.setRMDelegationToken(rmDTToken); return response; } /** + * Get the flag which indicates that the process of getting delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsObtained(); + + /** + * Set the flag which indicates that the process of getting delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsObtained(boolean isObtained); + + /** * The Delegation tokens have a identifier which maps to * {@link AbstractDelegationTokenIdentifier}. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java index f0f7fad..5c449f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -31,18 +33,43 @@ @Private @Unstable - public static RenewDelegationTokenResponse newInstance(long expTime) { + public static RenewDelegationTokenResponse newInstance( + boolean isRenewed, long expTime) { RenewDelegationTokenResponse response = Records.newRecord(RenewDelegationTokenResponse.class); + response.setIsRenewed(isRenewed); response.setNextExpirationTime(expTime); return response; } + /** + * Get the flag which indicates that the process of renewing delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsRenewed(); + + /** + * Set the flag which indicates that the process of renewing delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsRenewed(boolean isRenewed); + + /** + * Get the new expiration time. + */ @Private @Unstable public abstract long getNextExpirationTime(); + /** + * Set the new expiration time + */ @Private @Unstable public abstract void setNextExpirationTime(long expTime); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e30434f..ebac668 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -610,12 +610,24 @@ RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50; + /** Interval at which the thread of cleanup delegation token operations runs */ + public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS = + RM_PREFIX + "delegation-token-operation.cleanup-interval-ms"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS = + 60000l; + + /** The max life time of a delegation token operation after its finish before its cleanup */ + public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS = + RM_PREFIX + "delegation-token-operation.cleanup-delay-ms"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS = + 30000l; + /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index feb3bb7..5b24558 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -355,8 +355,25 @@ public Token getRMDelegationToken(Text renewer) GetDelegationTokenRequest rmDTRequest = Records.newRecord(GetDelegationTokenRequest.class); rmDTRequest.setRenewer(renewer.toString()); - GetDelegationTokenResponse response = - rmClient.getDelegationToken(rmDTRequest); + + GetDelegationTokenResponse response; + int pollCount = 0; + while (true) { + response = rmClient.getDelegationToken(rmDTRequest); + if (response.getIsObtained()) { + break; + } + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Get the RM delegation token is not finished"); + } + try { + Thread.sleep(asyncApiPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for getting the delegation token"); + } + } return response.getRMDelegationToken(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 7c34966..adc940f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.client.api.impl; -import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -38,12 +37,15 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -70,6 +72,7 @@ import org.apache.log4j.Logger; import org.junit.Test; + public class TestYarnClient { @Test @@ -93,7 +96,7 @@ public void testClientStop() { @SuppressWarnings("deprecation") @Test (timeout = 30000) - public void testSubmitApplication() { + public void testSubmitApplication() throws Exception { Configuration conf = new Configuration(); conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, 100); // speed up tests @@ -148,6 +151,21 @@ public void testKillApplication() throws Exception { .forceKillApplication(any(KillApplicationRequest.class)); } + @SuppressWarnings("resource") + @Test + public void testGetDelegationToken() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + 100); // speed up tests + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + client.getRMDelegationToken(new Text("test renewer")); + verify(((MockYarnClient) client).rmClient, times(5)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + } + @Test(timeout = 30000) public void testApplicationType() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -265,6 +283,18 @@ public void start() { Assert.fail("Exception is not expected."); } when(mockResponse.getApplicationReport()).thenReturn(mockReport); + + try { + when(rmClient.getDelegationToken( + any(GetDelegationTokenRequest.class))).thenReturn( + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(true, null)); + } catch (Exception e) { + Assert.fail(); // shouldn't happen with Mockito + } } public ApplicationClientProtocol getRMClient() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java index ec2b2b2..791cc66 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java @@ -20,16 +20,20 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProtoOrBuilder; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import com.google.protobuf.TextFormat; @Private @Unstable -public class CancelDelegationTokenResponsePBImpl extends CancelDelegationTokenResponse { +public class CancelDelegationTokenResponsePBImpl + extends CancelDelegationTokenResponse { CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto .getDefaultInstance(); + CancelDelegationTokenResponseProto.Builder builder = null; + boolean viaProto = false; public CancelDelegationTokenResponsePBImpl() { } @@ -39,6 +43,19 @@ public CancelDelegationTokenResponsePBImpl( this.proto = proto; } + @Override + public boolean getIsCanceled() { + CancelDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsCanceled(); + } + + @Override + public void setIsCanceled(boolean isCanceled) { + maybeInitBuilder(); + builder.setIsCanceled(isCanceled); + } + public CancelDelegationTokenResponseProto getProto() { return proto; } @@ -62,4 +79,12 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CancelDelegationTokenResponseProto.newBuilder(proto); + } + viaProto = false; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java index 93f4b5b..9502b78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java @@ -72,6 +72,19 @@ public void setRMDelegationToken(Token appToken) { this.appToken = appToken; } + @Override + public boolean getIsObtained() { + GetDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsObtained(); + } + + @Override + public void setIsObtained(boolean isObtained) { + maybeInitBuilder(); + builder.setIsObtained(isObtained); + } + public GetDelegationTokenResponseProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -128,4 +141,5 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java index 9d20b46..2007f5a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java @@ -89,4 +89,18 @@ public void setNextExpirationTime(long expTime) { maybeInitBuilder(); builder.setNewExpiryTime(expTime); } + + @Override + public boolean getIsRenewed() { + RenewDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsRenewed(); + } + + @Override + public void setIsRenewed(boolean isRenewed) { + maybeInitBuilder(); + builder.setIsRenewed(isRenewed); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenOperationListener.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenOperationListener.java new file mode 100644 index 0000000..10abcaa --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenOperationListener.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.security; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +/** + * The interface to handle the inquiry of RM delegation token operation + */ +@SuppressWarnings("rawtypes") +@Private +@Unstable +public interface RMDelegationTokenOperationListener + extends EventHandler { + + static enum DelegationTokenOperationType { + GET, + RENEW, + CANCEL + } + + static enum DelegationTokenOperationState { + NEW, + PROGRESSING, + FINISHED + } + + DelegationTokenOperationState getAndInitGetDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier); + + DelegationTokenOperationState getAndInitCancelDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier); + + DelegationTokenOperationState getAndInitRenewDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier); + + Token pollGetDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier); + + void pollCancelDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier); + + long pollRenewDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index 418ccb2..1959541 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -36,10 +36,16 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.RMDelegationTokenOperationListener; +import org.apache.hadoop.yarn.security.RMDelegationTokenOperationListener.DelegationTokenOperationState; import org.apache.hadoop.yarn.util.Records; +import org.mortbay.log.Log; /** * Delegation Token Identifier that identifies the delegation tokens from the @@ -98,20 +104,45 @@ public static void setSecretManager( public long renew(Token token, Configuration conf) throws IOException, InterruptedException { final ApplicationClientProtocol rmClient = getRmClient(token, conf); + long asyncApiPollIntervalMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); if (rmClient != null) { try { - RenewDelegationTokenRequest request = - Records.newRecord(RenewDelegationTokenRequest.class); - request.setDelegationToken(convertToProtoToken(token)); - return rmClient.renewDelegationToken(request).getNextExpirationTime(); + while (true) { + RenewDelegationTokenRequest request = + Records.newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + RenewDelegationTokenResponse response = + rmClient.renewDelegationToken(request); + if (response.getIsRenewed()) { + return response.getNextExpirationTime(); + } + Thread.sleep(asyncApiPollIntervalMillis); + } } catch (YarnException e) { throw new IOException(e); } finally { RPC.stopProxy(rmClient); } } else { - return localSecretManager.renewToken( - (Token)token, getRenewer(token)); + if (localSecretManager instanceof RMDelegationTokenOperationListener) { + RMDelegationTokenOperationListener listener = + (RMDelegationTokenOperationListener) localSecretManager; + RMDelegationTokenIdentifier rmDTIdentifier = + ((Token)token).decodeIdentifier(); + listener.getAndInitRenewDelegationTokenOperationState(rmDTIdentifier); + localSecretManager.renewToken( + (Token) token, getRenewer(token)); + while (listener.getAndInitRenewDelegationTokenOperationState(rmDTIdentifier) + != DelegationTokenOperationState.FINISHED) { + Thread.sleep(asyncApiPollIntervalMillis); + } + return listener.pollRenewDelegationTokenResponse(rmDTIdentifier); + } else { + return localSecretManager.renewToken( + (Token) token, getRenewer(token)); + } } } @@ -120,20 +151,45 @@ public long renew(Token token, Configuration conf) throws IOException, public void cancel(Token token, Configuration conf) throws IOException, InterruptedException { final ApplicationClientProtocol rmClient = getRmClient(token, conf); + long asyncApiPollIntervalMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); if (rmClient != null) { try { - CancelDelegationTokenRequest request = - Records.newRecord(CancelDelegationTokenRequest.class); - request.setDelegationToken(convertToProtoToken(token)); - rmClient.cancelDelegationToken(request); + while (true) { + CancelDelegationTokenRequest request = + Records.newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + CancelDelegationTokenResponse response = + rmClient.cancelDelegationToken(request); + if (response.getIsCanceled()) { + break; + } + Thread.sleep(asyncApiPollIntervalMillis); + } } catch (YarnException e) { throw new IOException(e); } finally { RPC.stopProxy(rmClient); } } else { - localSecretManager.cancelToken( - (Token)token, getRenewer(token)); + if (localSecretManager instanceof RMDelegationTokenOperationListener) { + RMDelegationTokenOperationListener listener = + (RMDelegationTokenOperationListener) localSecretManager; + RMDelegationTokenIdentifier rmDTIdentifier = + ((Token) token).decodeIdentifier(); + listener.getAndInitCancelDelegationTokenOperationState(rmDTIdentifier); + localSecretManager.cancelToken( + (Token) token, getRenewer(token)); + while (listener.getAndInitCancelDelegationTokenOperationState(rmDTIdentifier) + != DelegationTokenOperationState.FINISHED) { + Thread.sleep(asyncApiPollIntervalMillis); + } + listener.pollCancelDelegationTokenResponse(rmDTIdentifier); + } else { + localSecretManager.cancelToken( + (Token) token, getRenewer(token)); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cc8b124..1845d72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -524,6 +524,20 @@ + Interval at which the thread of cleanup delegation token + operations runs + yarn.resourcemanager.delegation-token-operation.cleanup-interval-ms + 60000 + + + + The max life time of a delegation token operation after its + finish before its cleanup + yarn.resourcemanager.delegation-token-operation.cleanup-delay-ms + 30000 + + + Interval for the roll over for the master key used to generate application tokens diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index fdde381..3485ab8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -105,7 +105,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Futures; @@ -651,8 +650,6 @@ public GetDelegationTokenResponse getDelegationToken( "Delegation Token can be issued only with kerberos authentication"); } - GetDelegationTokenResponse response = - recordFactory.newRecordInstance(GetDelegationTokenResponse.class); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); Text owner = new Text(ugi.getUserName()); Text realUser = null; @@ -662,17 +659,25 @@ public GetDelegationTokenResponse getDelegationToken( RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser); - Token realRMDTtoken = - new Token(tokenIdentifier, - this.rmDTSecretManager); - response.setRMDelegationToken( - BuilderUtils.newDelegationToken( - realRMDTtoken.getIdentifier(), - realRMDTtoken.getKind().toString(), - realRMDTtoken.getPassword(), - realRMDTtoken.getService().toString() - )); - return response; + switch (rmDTSecretManager.getAndInitGetDelegationTokenOperationState(tokenIdentifier)) { + case NEW: + rmDTSecretManager.holdToken(tokenIdentifier, + new Token( + tokenIdentifier, rmDTSecretManager)); + return GetDelegationTokenResponse.newInstance(false, null); + case PROGRESSING: + return GetDelegationTokenResponse.newInstance(false, null); + case FINISHED: + default: + Token delegationToken = + rmDTSecretManager.pollGetDelegationTokenResponse(tokenIdentifier); + return GetDelegationTokenResponse.newInstance(true, + BuilderUtils.newDelegationToken( + delegationToken.getIdentifier(), + delegationToken.getKind().toString(), + delegationToken.getPassword(), + delegationToken.getService().toString())); + } } catch(IOException io) { throw RPCUtil.getRemoteException(io); } @@ -692,12 +697,20 @@ public RenewDelegationTokenResponse renewDelegationToken( protoToken.getIdentifier().array(), protoToken.getPassword().array(), new Text(protoToken.getKind()), new Text(protoToken.getService())); - String user = getRenewerForToken(token); - long nextExpTime = rmDTSecretManager.renewToken(token, user); - RenewDelegationTokenResponse renewResponse = Records - .newRecord(RenewDelegationTokenResponse.class); - renewResponse.setNextExpirationTime(nextExpTime); - return renewResponse; + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + switch (rmDTSecretManager.getAndInitRenewDelegationTokenOperationState(rmDTIdentifier)) { + case NEW: + String user = getRenewerForToken(token); + rmDTSecretManager.renewToken(token, user); + return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE); + case PROGRESSING: + return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE); + case FINISHED: + default: + long expTime = rmDTSecretManager.pollRenewDelegationTokenResponse( + rmDTIdentifier); + return RenewDelegationTokenResponse.newInstance(true, expTime); + } } catch (IOException e) { throw RPCUtil.getRemoteException(e); } @@ -716,9 +729,19 @@ public CancelDelegationTokenResponse cancelDelegationToken( protoToken.getIdentifier().array(), protoToken.getPassword().array(), new Text(protoToken.getKind()), new Text(protoToken.getService())); - String user = getRenewerForToken(token); - rmDTSecretManager.cancelToken(token, user); - return Records.newRecord(CancelDelegationTokenResponse.class); + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + switch (rmDTSecretManager.getAndInitCancelDelegationTokenOperationState(rmDTIdentifier)) { + case NEW: + String user = getRenewerForToken(token); + rmDTSecretManager.cancelToken(token, user); + return CancelDelegationTokenResponse.newInstance(false); + case PROGRESSING: + return CancelDelegationTokenResponse.newInstance(false); + case FINISHED: + default: + rmDTSecretManager.pollCancelDelegationTokenResponse(rmDTIdentifier); + return CancelDelegationTokenResponse.newInstance(true); + } } catch (IOException e) { throw RPCUtil.getRemoteException(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java index 9fdde65..3bedd17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java @@ -18,7 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -27,9 +28,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; public class RMSecretManagerService extends AbstractService { @@ -66,6 +68,8 @@ public RMSecretManagerService(Configuration conf, RMContextImpl rmContext) { rmDTSecretManager = createRMDelegationTokenSecretManager(conf, rmContext); rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager); + rmContext.getDispatcher().register( + RMDelegationTokenEventType.class, rmDTSecretManager); } @Override @@ -137,7 +141,7 @@ protected RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext, conf); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 05bfb3b..9121d814 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -61,6 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType; @Private @Unstable @@ -435,18 +437,17 @@ protected abstract void updateApplicationAttemptStateInternal( ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; /** - * RMDTSecretManager call this to store the state of a delegation token + * Non-blocking + * API RMDTSecretManager call this to store the state of a delegation token * and sequence number */ + @SuppressWarnings("unchecked") public synchronized void storeRMDelegationTokenAndSequenceNumber( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) { - try { - storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, - latestSequenceNumber); - } catch (Exception e) { - notifyStoreOperationFailed(e); - } + dispatcher.getEventHandler().handle( + new RMStateStoreRMDelegationTokenAndSequenceNumberEvent( + rmDTIdentifier, renewDate, latestSequenceNumber)); } /** @@ -459,15 +460,29 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState( int latestSequenceNumber) throws Exception; /** - * RMDTSecretManager call this to remove the state of a delegation token + * Non-blocking + * API RMDTSecretManager call this to update the state of a delegation token + * and sequence number + */ + @SuppressWarnings("unchecked") + public synchronized void updateRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) { + dispatcher.getEventHandler().handle( + new RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent( + rmDTIdentifier, renewDate, latestSequenceNumber)); + } + + /** + * Non-blocking + * API RMDTSecretManager call this to remove the state of a delegation token */ + @SuppressWarnings("unchecked") public synchronized void removeRMDelegationToken( RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { - try { - removeRMDelegationTokenState(rmDTIdentifier); - } catch (Exception e) { - notifyStoreOperationFailed(e); - } + dispatcher.getEventHandler().handle( + new RMStateStoreRemoveRMDelegationTokenEvent( + rmDTIdentifier, sequenceNumber)); } /** @@ -669,6 +684,57 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.error("Error removing app: " + appId, e); notifyStoreOperationFailed(e); } + } else if (event.getType().equals(RMStateStoreEventType.STORE_RM_DT_SN)) { + RMStateStoreRMDelegationTokenAndSequenceNumberEvent rmDTAndSNEvent = + (RMStateStoreRMDelegationTokenAndSequenceNumberEvent) event; + Exception storedException = null; + LOG.info("Storing info for delegation token: " + + rmDTAndSNEvent.getRmDTIdentifier()); + try { + storeRMDelegationTokenAndSequenceNumberState( + rmDTAndSNEvent.getRmDTIdentifier(), rmDTAndSNEvent.getRenewDate(), + rmDTAndSNEvent.getSequenceNumber()); + notifyDoneStoringRMDelegationTokenAndSequenceNumber( + rmDTAndSNEvent.getRmDTIdentifier(), storedException); + } catch (Exception e) { + LOG.error("Error storing delegation token: " + + rmDTAndSNEvent.getRmDTIdentifier(), e); + notifyStoreOperationFailed(e); + } + } else if (event.getType().equals(RMStateStoreEventType.REMOVE_RM_DT_SN)) { + RMStateStoreRemoveRMDelegationTokenEvent removeRmDTEvent = + (RMStateStoreRemoveRMDelegationTokenEvent) event; + Exception removedException = null; + LOG.info("Removing info for delegation token: " + + removeRmDTEvent.getRmDTIdentifier()); + try { + removeRMDelegationTokenState(removeRmDTEvent.getRmDTIdentifier()); + notifyDoneRemovingRMDelegationToken( + removeRmDTEvent.getRmDTIdentifier(), removedException); + } catch (Exception e) { + LOG.error("Error removing delegation token: " + + removeRmDTEvent.getRmDTIdentifier(), e); + notifyStoreOperationFailed(e); + } + } else if (event.getType().equals(RMStateStoreEventType.UPDATE_RM_DT_SN)) { + RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent updateRmDTAndSNEvent = + (RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent) event; + Exception updatedException = null; + LOG.info("Updating info for delegation token: " + + updateRmDTAndSNEvent.getRmDTIdentifier()); + try { + removeRMDelegationTokenState(updateRmDTAndSNEvent.getRmDTIdentifier()); + storeRMDelegationTokenAndSequenceNumberState( + updateRmDTAndSNEvent.getRmDTIdentifier(), + updateRmDTAndSNEvent.getRenewDate(), + updateRmDTAndSNEvent.getSequenceNumber()); + notifyDoneUpdatingRMDelegationTokenAndSequenceNumber( + updateRmDTAndSNEvent.getRmDTIdentifier(), updatedException); + } catch (Exception e) { + LOG.error("Error updating delegation token: " + + updateRmDTAndSNEvent.getRmDTIdentifier(), e); + notifyStoreOperationFailed(e); + } } else { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } @@ -731,6 +797,60 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId } /** + * In (@link handleStoreEvent}, this method is called to notify the + * RMDelegationTokenSecretManager that new delegation token is stored in state + * store + * @param rmDTIdentifier identifier of the delegation token that has been saved + * @param storedException the exception that is thrown when storing the + * delegation token + */ + @SuppressWarnings("unchecked") + private void notifyDoneStoringRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, + Exception storedException) { + //TODO: YARN-1397 will clean the exception dependency + rmDispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_STORED, + rmDTIdentifier, storedException)); + } + + /** + * In (@link handleStoreEvent}, this method is called to notify the + * RMDelegationTokenSecretManager that new delegation token is removed from state + * store + * @param rmDTIdentifier identifier of the delegation token that has been removed + * @param storedException the exception that is thrown when removing the + * delegation token + */ + @SuppressWarnings("unchecked") + private void notifyDoneRemovingRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, + Exception storedException) { + //TODO: YARN-1397 will clean the exception dependency + rmDispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_REMOVED, + rmDTIdentifier, storedException)); + } + + /** + * In (@link handleStoreEvent}, this method is called to notify the + * RMDelegationTokenSecretManager that the delegation token is updated in state + * store + * @param rmDTIdentifier identifier of the delegation token that has been updated + * @param storedException the exception that is thrown when updating the + * delegation token + */ + @SuppressWarnings("unchecked") + private void notifyDoneUpdatingRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, + Exception updatedException) { + //TODO: YARN-1397 will clean the exception dependency + rmDispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_UPDATED, + rmDTIdentifier, updatedException)); + } + + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 903f4e7..0339fa9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -23,5 +23,8 @@ STORE_APP, UPDATE_APP, UPDATE_APP_ATTEMPT, - REMOVE_APP + REMOVE_APP, + STORE_RM_DT_SN, + REMOVE_RM_DT_SN, + UPDATE_RM_DT_SN } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java new file mode 100644 index 0000000..7dc266f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +public class RMStateStoreRMDelegationTokenAndSequenceNumberEvent + extends RMStateStoreEvent { + + private final RMDelegationTokenIdentifier rmDTIdentifier; + private final Long renewDate; + private final int sequenceNumber; + + public RMStateStoreRMDelegationTokenAndSequenceNumberEvent( + RMDelegationTokenIdentifier rmDTIdentifier, + Long renewDate, int sequenceNumber) { + super(RMStateStoreEventType.STORE_RM_DT_SN); + this.rmDTIdentifier = rmDTIdentifier; + this.renewDate = renewDate; + this.sequenceNumber = sequenceNumber; + } + + public RMDelegationTokenIdentifier getRmDTIdentifier() { + return rmDTIdentifier; + } + + public Long getRenewDate() { + return renewDate; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java new file mode 100644 index 0000000..9d54373 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +public class RMStateStoreRemoveRMDelegationTokenEvent + extends RMStateStoreEvent { + + private final RMDelegationTokenIdentifier rmDTIdentifier; + private final int sequenceNumber; + + public RMStateStoreRemoveRMDelegationTokenEvent( + RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { + super(RMStateStoreEventType.REMOVE_RM_DT_SN); + this.rmDTIdentifier = rmDTIdentifier; + this.sequenceNumber = sequenceNumber; + } + + public RMDelegationTokenIdentifier getRmDTIdentifier() { + return rmDTIdentifier; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java new file mode 100644 index 0000000..47b713c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +public class RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent + extends RMStateStoreEvent { + + private final RMDelegationTokenIdentifier rmDTIdentifier; + private final Long renewDate; + private final int sequenceNumber; + + public RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent( + RMDelegationTokenIdentifier rmDTIdentifier, + Long renewDate, int sequenceNumber) { + super(RMStateStoreEventType.UPDATE_RM_DT_SN); + this.rmDTIdentifier = rmDTIdentifier; + this.renewDate = renewDate; + this.sequenceNumber = sequenceNumber; + } + + public RMDelegationTokenIdentifier getRmDTIdentifier() { + return rmDTIdentifier; + } + + public Long getRenewDate() { + return renewDate; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java new file mode 100644 index 0000000..b4ad11d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + + +public class RMDelegationTokenEvent extends + AbstractEvent { + + private RMDelegationTokenIdentifier rmDTIdentifier; + private Exception e; + + public RMDelegationTokenEvent(RMDelegationTokenEventType type, + RMDelegationTokenIdentifier rmDTIdentifier, Exception e) { + super(type); + this.rmDTIdentifier = rmDTIdentifier; + this.e = e; + } + + public RMDelegationTokenIdentifier getRMDelegationTokenIdentifier() { + return rmDTIdentifier; + } + + public Exception getException() { + return e; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java new file mode 100644 index 0000000..9ac2c2c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + + +public enum RMDelegationTokenEventType { + //Source: RMStateStore + DT_STORED, + DT_REMOVED, + DT_UPDATED +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939de..477004e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -23,15 +23,22 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.RMDelegationTokenOperationListener; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -48,11 +55,16 @@ @InterfaceStability.Unstable public class RMDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager implements - Recoverable { + Recoverable, RMDelegationTokenOperationListener { private static final Log LOG = LogFactory .getLog(RMDelegationTokenSecretManager.class); protected final RMContext rmContext; + protected final ConcurrentMap dtOperations = + new ConcurrentHashMap(); + protected final long dtOperationsCleanupInterval; + protected final long dtOperationsCleanupDelay; + protected final Thread dtOperationsCleaner; /** * Create a secret manager @@ -63,15 +75,48 @@ * @param delegationTokenRenewInterval how often the tokens must be renewed * @param delegationTokenRemoverScanInterval how often the tokens are scanned * for expired tokens + * @param rmContext + * @param conf */ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenRemoverScanInterval, - RMContext rmContext) { + RMContext rmContext, + Configuration conf) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); this.rmContext = rmContext; + dtOperationsCleanupInterval = conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS); + dtOperationsCleanupDelay = conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS); + dtOperationsCleaner = new Daemon(new Runnable() { + @Override + public void run() { + while (true) { + try { + Thread.sleep(dtOperationsCleanupInterval); + } catch (InterruptedException e) { + LOG.warn( + "The thread of cleanup finished delegation token operations is interrupted"); + return; + } + for (DelegationTokenOperationKey key : dtOperations.keySet()) { + DelegationTokenOperationValue value = dtOperations.get(key); + if (value != null && + value.getState() == DelegationTokenOperationState.FINISHED) { + long delay = System.currentTimeMillis() - value.getFinishTime(); + if (delay >= dtOperationsCleanupDelay) { + dtOperations.remove(key); + } + } + } + } + } + }); } @Override @@ -122,9 +167,7 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id, try { LOG.info("updating RMDelegation token with sequence number: " + id.getSequenceNumber()); - rmContext.getStateStore().removeRMDelegationToken(id, - delegationTokenSequenceNumber); - rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id, + rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id, renewDate, id.getSequenceNumber()); } catch (Exception e) { LOG.error("Error in updating persisted RMDelegationToken with sequence number: " @@ -195,4 +238,261 @@ public void recover(RMState rmState) throws Exception { addPersistedDelegationToken(entry.getKey(), entry.getValue()); } } + + @Override + public DelegationTokenOperationState getAndInitGetDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier) { + return getAndInitDelegationTokenOperationState( + rmDTIdentifier, DelegationTokenOperationType.GET); + } + + @Override + public DelegationTokenOperationState getAndInitCancelDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier) { + return getAndInitDelegationTokenOperationState( + rmDTIdentifier, DelegationTokenOperationType.CANCEL); + } + + @Override + public DelegationTokenOperationState getAndInitRenewDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier) { + return getAndInitDelegationTokenOperationState( + rmDTIdentifier, DelegationTokenOperationType.RENEW); + } + + protected DelegationTokenOperationState getAndInitDelegationTokenOperationState( + RMDelegationTokenIdentifier rmDTIdentifier, + DelegationTokenOperationType type) { + DelegationTokenOperationKey key = new DelegationTokenOperationKey( + rmDTIdentifier, type); + DelegationTokenOperationValue value = dtOperations.putIfAbsent( + key, new DelegationTokenOperationValue()); + if (value == null) { + return DelegationTokenOperationState.NEW; + } else { + return value.getState(); + } + } + + @SuppressWarnings("unchecked") + @Override + public Token pollGetDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier) { + Object obj = pollDelegationTokenOperationResponse( + rmDTIdentifier, DelegationTokenOperationType.GET); + assert obj instanceof Token; + return (Token) obj; + } + + @Override + public void pollCancelDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier) { + pollDelegationTokenOperationResponse( + rmDTIdentifier, DelegationTokenOperationType.CANCEL); + } + + @Override + public long pollRenewDelegationTokenResponse( + RMDelegationTokenIdentifier rmDTIdentifier) { + Object obj = pollDelegationTokenOperationResponse( + rmDTIdentifier, DelegationTokenOperationType.RENEW); + assert obj instanceof Long; + return (Long) obj; + } + + protected Object pollDelegationTokenOperationResponse( + RMDelegationTokenIdentifier rmDTIdentifier, + DelegationTokenOperationType type) { + DelegationTokenOperationKey key = new DelegationTokenOperationKey( + rmDTIdentifier, type); + DelegationTokenOperationValue value = dtOperations.remove(key); + if (value == null) { + return null; + } else { + return value.getResponse(); + } + } + + @Override + public synchronized long renewToken(Token token, + String renewer) throws InvalidToken, IOException { + long renewTime = super.renewToken(token, renewer); + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + DelegationTokenOperationValue value = getDelegationTokenOperationValue( + rmDTIdentifier, DelegationTokenOperationType.RENEW); + if (value != null) { + value.setResponse(renewTime); + } + return renewTime; + } + + public synchronized void holdToken( + RMDelegationTokenIdentifier rmDTIdentifier, + Token token) { + DelegationTokenOperationValue value = getDelegationTokenOperationValue( + rmDTIdentifier, DelegationTokenOperationType.GET); + if (value != null) { + value.setResponse(token); + } + } + + @Override + public void handle(RMDelegationTokenEvent event) { + DelegationTokenOperationValue value; + switch (event.getType()) { + case DT_STORED: + value = getDelegationTokenOperationValue( + event.getRMDelegationTokenIdentifier(), + DelegationTokenOperationType.GET); + if (value != null) { + value.setState(DelegationTokenOperationState.FINISHED); + } + break; + case DT_REMOVED: + value = getDelegationTokenOperationValue( + event.getRMDelegationTokenIdentifier(), + DelegationTokenOperationType.CANCEL); + if (value != null) { + value.setState(DelegationTokenOperationState.FINISHED); + } + break; + case DT_UPDATED: + value = getDelegationTokenOperationValue( + event.getRMDelegationTokenIdentifier(), + DelegationTokenOperationType.RENEW); + if (value != null) { + value.setState(DelegationTokenOperationState.FINISHED); + } + break; + default: + LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + } + } + + @Override + public void startThreads() throws IOException { + super.startThreads(); + dtOperationsCleaner.start(); + } + + @Override + public void stopThreads() { + super.stopThreads(); + if (LOG.isDebugEnabled()) + LOG.debug("Stopping delegation token operation cleaner thread"); + running = false; + + if (dtOperationsCleaner != null) { + synchronized (noInterruptsLock) { + dtOperationsCleaner.interrupt(); + } + try { + dtOperationsCleaner.join(); + } catch (InterruptedException e) { + throw new RuntimeException( + "Unable to join on delegation token operation cleaner thread", e); + } + } + } + + protected DelegationTokenOperationValue getDelegationTokenOperationValue( + RMDelegationTokenIdentifier rmDTIdentifier, + DelegationTokenOperationType type) { + DelegationTokenOperationKey key = new DelegationTokenOperationKey( + rmDTIdentifier, type); + DelegationTokenOperationValue value = dtOperations.get(key); + if (value == null) { + LOG.error("The delegation token operation " + type + + " of the identifier " + rmDTIdentifier + " is missing"); + } + return value; + } + + protected static class DelegationTokenOperationKey { + private RMDelegationTokenIdentifier rmDTIdentifier; + private DelegationTokenOperationType type; + + public DelegationTokenOperationKey( + RMDelegationTokenIdentifier rmDTIdentifier, + DelegationTokenOperationType type) { + this.rmDTIdentifier = rmDTIdentifier; + this.type = type; + } + + @Override + public boolean equals(Object obj) { + // generated by eclipse + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + DelegationTokenOperationKey other = (DelegationTokenOperationKey) obj; + if (rmDTIdentifier == null) { + if (other.rmDTIdentifier != null) + return false; + } else if (!( + // use ower/real user/renewer only to identify a DT operation + rmDTIdentifier.getOwner().equals(other.rmDTIdentifier.getOwner()) && + rmDTIdentifier.getRealUser().equals(other.rmDTIdentifier.getRealUser()) && + rmDTIdentifier.getRenewer().equals(other.rmDTIdentifier.getRenewer()))) + return false; + if (type != other.type) + return false; + return true; + } + + @Override + public int hashCode() { + // generated by eclipse + final int prime = 31; + int result = 1; + // use ower/real user/renewer only to identify a DT operation + String rmDTIdentifierStr = rmDTIdentifier.getOwner().toString() + + rmDTIdentifier.getRealUser().toString() + + rmDTIdentifier.getRenewer().toString(); + result = + prime * result + + ((rmDTIdentifierStr == null) ? 0 : rmDTIdentifierStr.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + } + + protected static class DelegationTokenOperationValue { + private Object response; + private DelegationTokenOperationState state; + private long finishTime; + + public DelegationTokenOperationValue() { + state = DelegationTokenOperationState.PROGRESSING; + finishTime = Long.MAX_VALUE; + } + + public Object getResponse() { + return response; + } + + public void setResponse(Object response) { + this.response = response; + } + + public DelegationTokenOperationState getState() { + return state; + } + + public void setState(DelegationTokenOperationState state) { + this.state = state; + if (state == DelegationTokenOperationState.FINISHED) { + finishTime = System.currentTimeMillis(); + } + } + + public long getFinishTime() { + return finishTime; + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7c49681..69e7895 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -41,10 +41,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; -import com.google.common.collect.Sets; import junit.framework.Assert; -import org.apache.commons.lang.math.LongRange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,15 +52,20 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -75,8 +78,8 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -87,7 +90,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -95,6 +98,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -104,6 +108,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestClientRMService { private static final Log LOG = LogFactory.getLog(TestClientRMService.class); @@ -120,9 +126,20 @@ @BeforeClass public static void setupSecretManager() throws IOException { + Configuration conf = new YarnConfiguration(); RMContext rmContext = mock(RMContext.class); - when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); + RMStateStore store = new MemoryRMStateStore(); + store.init(conf); + store.start(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + store.setRMDispatcher(dispatcher); + when(rmContext.getStateStore()).thenReturn(store); + + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, + rmContext, conf); + dispatcher.register(RMDelegationTokenEventType.class, dtsm); dtsm.startThreads(); } @@ -345,6 +362,76 @@ private void checkTokenRenewal(UserGroupInformation owner, rmService.renewDelegationToken(request); } + @SuppressWarnings("resource") + @Test + public void testGetRnewCancelDelegationToken() throws Exception { + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = new ClientRMService( + rmContext, null, null, null, null, dtsm); + + int count = 0; + int maxCount = 10; + GetDelegationTokenResponse getResp = null; + for (; count < maxCount; ++count) { + getResp = owner.doAs( + new PrivilegedExceptionAction() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest request = + GetDelegationTokenRequest.newInstance(owner.getUserName()); + return rmService.getDelegationToken(request); + } + }); + if (getResp.getIsObtained()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); + + final org.apache.hadoop.yarn.api.records.Token token = + getResp.getRMDelegationToken(); + for (count = 0; count < maxCount; ++count) { + RenewDelegationTokenResponse renewResp = + owner.doAs(new PrivilegedExceptionAction() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest request = + RenewDelegationTokenRequest.newInstance(token); + return rmService.renewDelegationToken(request); + } + }); + if (renewResp.getIsRenewed()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); + + for (count = 0; count < maxCount; ++count) { + CancelDelegationTokenResponse cancelResp = + owner.doAs(new PrivilegedExceptionAction() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest request = + CancelDelegationTokenRequest.newInstance(token); + return rmService.cancelDelegationToken(request); + } + }); + if (cancelResp.getIsCanceled()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); + } + @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index d389c0e..a50458e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -55,15 +56,20 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.RMDelegationTokenOperationListener.DelegationTokenOperationState; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -82,7 +88,7 @@ public void resetSecretManager() { RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); } - @Test + @Test (timeout = 30000) public void testDelegationToken() throws IOException, InterruptedException { final YarnConfiguration conf = new YarnConfiguration(); @@ -295,6 +301,7 @@ private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, RMDelegationTokenSecretManager secretManager = mock(RMDelegationTokenSecretManager.class); RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr); + mockRMDelegationTokenOperationListenerMethods(secretManager); RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier( new Text("owner"), new Text("renewer"), null); @@ -306,6 +313,7 @@ private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, token.renew(conf); verify(secretManager).renewToken(eq(token), eq("renewer")); reset(secretManager); + mockRMDelegationTokenOperationListenerMethods(secretManager); token.cancel(conf); verify(secretManager).cancelToken(eq(token), eq("renewer")); } else { @@ -325,6 +333,20 @@ private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, verify(secretManager, never()).cancelToken(any(Token.class), anyString()); } } + + private static void mockRMDelegationTokenOperationListenerMethods( + RMDelegationTokenSecretManager secretManager) { + when(secretManager.getAndInitRenewDelegationTokenOperationState( + any(RMDelegationTokenIdentifier.class))).thenReturn( + DelegationTokenOperationState.FINISHED); + when(secretManager.getAndInitCancelDelegationTokenOperationState( + any(RMDelegationTokenIdentifier.class))).thenReturn( + DelegationTokenOperationState.FINISHED); + when(secretManager.pollRenewDelegationTokenResponse( + any(RMDelegationTokenIdentifier.class))).thenReturn(0L); + doNothing().when(secretManager).pollCancelDelegationTokenResponse( + any(RMDelegationTokenIdentifier.class)); + } @SuppressWarnings("rawtypes") public static class YarnBadRPC extends YarnRPC { @@ -362,8 +384,19 @@ public Server getServer(Class protocol, Object instance, GetDelegationTokenRequest request = Records .newRecord(GetDelegationTokenRequest.class); request.setRenewer(renewerString); - return clientRMService.getDelegationToken(request) - .getRMDelegationToken(); + do { + GetDelegationTokenResponse response = + clientRMService.getDelegationToken(request); + if (response.getIsObtained()) { + return response.getRMDelegationToken(); + } else { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + fail(); + } + } + } while (true); } }); return token; @@ -465,11 +498,20 @@ private static ResourceScheduler createMockScheduler(Configuration conf) { createRMDelegationTokenSecretManager(long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { RMContext rmContext = mock(RMContext.class); - when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + Configuration conf = new YarnConfiguration(); + RMStateStore store = new MemoryRMStateStore(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + store.init(conf); + store.start(); + when(rmContext.getStateStore()).thenReturn(store); RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, - tokenRenewInterval, 3600000, rmContext); + tokenRenewInterval, 3600000, rmContext, conf); + dispatcher.register(RMDelegationTokenEventType.class, rmDtSecretManager); + store.setRMDispatcher(dispatcher); return rmDtSecretManager; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d50f0d7..bdf9078 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -47,6 +47,8 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -54,6 +56,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -93,6 +97,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mortbay.log.Log; public class TestRMRestart { @@ -1192,7 +1197,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 30000) public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set( @@ -1223,8 +1228,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { GetDelegationTokenRequest.newInstance("renewer1"); UserGroupInformation.getCurrentUser().setAuthenticationMethod( AuthMethod.KERBEROS); - GetDelegationTokenResponse response1 = - rm1.getClientRMService().getDelegationToken(request1); + GetDelegationTokenResponse response1; + do { + response1 = rm1.getClientRMService().getDelegationToken(request1); + if (response1.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); org.apache.hadoop.yarn.api.records.Token delegationToken1 = response1.getRMDelegationToken(); Token token1 = @@ -1262,8 +1274,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // request one more token GetDelegationTokenRequest request2 = GetDelegationTokenRequest.newInstance("renewer2"); - GetDelegationTokenResponse response2 = - rm1.getClientRMService().getDelegationToken(request2); + GetDelegationTokenResponse response2; + do { + response2 = rm1.getClientRMService().getDelegationToken(request2); + if (response2.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); org.apache.hadoop.yarn.api.records.Token delegationToken2 = response2.getRMDelegationToken(); Token token2 = @@ -1271,12 +1290,17 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier(); // cancel token2 - try{ - rm1.getRMDTSecretManager().cancelToken(token2, - UserGroupInformation.getCurrentUser().getUserName()); - } catch(Exception e) { - Assert.fail(); - } + CancelDelegationTokenRequest request3 = + CancelDelegationTokenRequest.newInstance(delegationToken2); + do { + CancelDelegationTokenResponse response3 = + rm1.getClientRMService().cancelDelegationToken(request3); + if (response3.getIsCanceled()) { + break; + } else { + Thread.sleep(200); + } + } while (true); // Assert the token which has the latest delegationTokenSequenceNumber is removed Assert.assertEquals( @@ -1301,16 +1325,24 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), rm2.getRMDTSecretManager().getLatestDTSequenceNumber()); + // Sleep for a while to ensure the previous renew is done + Thread.sleep(500); // renewDate before renewing Long renewDateBeforeRenew = allTokensRM2.get(dtId1); - try{ - // Sleep for one millisecond to make sure renewDataAfterRenew is greater - Thread.sleep(1); - // renew recovered token - rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); - } catch(Exception e) { - Assert.fail(); - } + // Sleep for one millisecond to make sure renewDataAfterRenew is greater + Thread.sleep(1); + // renew recovered token + RenewDelegationTokenRequest request4 = + RenewDelegationTokenRequest.newInstance(delegationToken1); + do { + RenewDelegationTokenResponse response4 = + rm2.getClientRMService().renewDelegationToken(request4); + if (response4.getIsRenewed()) { + break; + } else { + Thread.sleep(200); + } + } while (true); allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); Long renewDateAfterRenew = allTokensRM2.get(dtId1); @@ -1322,12 +1354,17 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // assert old token is removed from state store Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); - try{ - rm2.getRMDTSecretManager().cancelToken(token1, - UserGroupInformation.getCurrentUser().getUserName()); - } catch(Exception e) { - Assert.fail(); - } + CancelDelegationTokenRequest request5 = + CancelDelegationTokenRequest.newInstance(delegationToken1); + do { + CancelDelegationTokenResponse response5 = + rm2.getClientRMService().cancelDelegationToken(request5); + if (response5.getIsCanceled()) { + break; + } else { + Thread.sleep(200); + } + } while (true); // assert token is removed from state after its cancelled allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); @@ -1341,7 +1378,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // This is to test submit an application to the new RM with the old delegation // token got from previous RM. - @Test + @Test (timeout = 30000) public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1359,8 +1396,15 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() GetDelegationTokenRequest.newInstance("renewer1"); UserGroupInformation.getCurrentUser().setAuthenticationMethod( AuthMethod.KERBEROS); - GetDelegationTokenResponse response1 = - rm1.getClientRMService().getDelegationToken(request1); + GetDelegationTokenResponse response1; + do { + response1 = rm1.getClientRMService().getDelegationToken(request1); + if (response1.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); Token token1 = ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 30cdbc1..512c2b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -69,7 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; -public class RMStateStoreTestBase extends ClientBaseWithFixes{ +public class RMStateStoreTestBase extends ClientBaseWithFixes { public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); @@ -349,7 +349,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) store.close(); } - public void testRMDTSecretManagerStateStore( + void testRMDTSecretManagerStateStore( RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); @@ -373,6 +373,9 @@ public void testRMDTSecretManagerStateStore( keySet.add(key); store.storeRMDTMasterKey(key); + // let things settle down + Thread.sleep(1000); + RMDTSecretManagerState secretManagerState = store.loadState().getRMDTSecretManagerState(); Assert.assertEquals(token1, secretManagerState.getTokenState()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 3b5add8..f0bce02 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.RMDelegationTokenOperationListener.DelegationTokenOperationType; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -97,10 +98,17 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { // request to generate a RMDelegationToken GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); when(request.getRenewer()).thenReturn("renewer1"); - GetDelegationTokenResponse response = - rm1.getClientRMService().getDelegationToken(request); - org.apache.hadoop.yarn.api.records.Token delegationToken = - response.getRMDelegationToken(); + org.apache.hadoop.yarn.api.records.Token delegationToken; + do { + GetDelegationTokenResponse response = + rm1.getClientRMService().getDelegationToken(request); + if (response.getIsObtained()) { + delegationToken = response.getRMDelegationToken(); + break; + } else { + Thread.sleep(200); + } + } while(true); Token token1 = ConverterUtils.convertFromYarn(delegationToken, null); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); @@ -160,6 +168,47 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { } } + @SuppressWarnings("resource") + @Test + public void testCleanupOutstandingFinishedDTOperations() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + conf.setLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS, + 1000); + conf.setLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS, + 500); + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + + for (DelegationTokenOperationType type : DelegationTokenOperationType.values()) { + RMDelegationTokenIdentifier rmDTIdentifier = new RMDelegationTokenIdentifier(); + RMDelegationTokenSecretManager.DelegationTokenOperationKey key = + new RMDelegationTokenSecretManager.DelegationTokenOperationKey( + rmDTIdentifier, type); + RMDelegationTokenSecretManager.DelegationTokenOperationValue value = + new RMDelegationTokenSecretManager.DelegationTokenOperationValue(); + dtSecretManager.dtOperations.put(key, value); + } + Assert.assertEquals(DelegationTokenOperationType.values().length, + dtSecretManager.dtOperations.size()); + for (RMDelegationTokenSecretManager.DelegationTokenOperationValue value : + dtSecretManager.dtOperations.values()) { + value.setState( + RMDelegationTokenSecretManager.DelegationTokenOperationState.FINISHED); + } + + // wait for cleanup + Thread.sleep(2000); + + Assert.assertEquals(0, dtSecretManager.dtOperations.size()); + + rm1.stop(); + } + class MyMockRM extends TestSecurityMockRM { public MyMockRM(Configuration conf, RMStateStore store) { @@ -177,7 +226,7 @@ protected RMSecretManagerService createRMSecretManagerService() { // KeyUpdateInterval-> 1 seconds // TokenMaxLifetime-> 2 seconds. return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000, - rmContext); + rmContext, conf); } }; } @@ -190,10 +239,11 @@ protected RMSecretManagerService createRMSecretManagerService() { public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval, RMContext rmContext) { + long delegationTokenRemoverScanInterval, RMContext rmContext, + Configuration conf) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval, - rmContext); + rmContext, conf); } @Override