diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java index aeb20cd..eec4537 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java @@ -104,6 +104,7 @@ public void testClusterGetDelegationToken() throws Exception { rmDTToken.setKind("Testclusterkind"); rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); rmDTToken.setService("0.0.0.0:8032"); + getDTResponse.setIsObtained(true); getDTResponse.setRMDelegationToken(rmDTToken); final ApplicationClientProtocol cRMProtocol = mock(ApplicationClientProtocol.class); when(cRMProtocol.getDelegationToken(any( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 0abaafb..7049415 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -298,13 +298,22 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls( /** *

The interface used by clients to get delegation token, enabling the - * containers to be able to talk to the service using those tokens. - * - *

The ResourceManager responds with the delegation - * {@link Token} that can be used by the client to speak to this - * service. + * containers to be able to talk to the service using those tokens.

+ * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to get the + * {@link Token}. + *

* @param request request to get a delegation token for the client. - * @return delegation token that can be used to talk to this service + * @return the response including a flag and a delegation token that can be + * used to talk to this service * @throws YarnException * @throws IOException */ @@ -316,9 +325,20 @@ public GetDelegationTokenResponse getDelegationToken( /** * Renew an existing delegation {@link Token}. - * + * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to get new expiry + * time. + *

* @param request the delegation token to be renewed. - * @return the new expiry time for the delegation token. + * @return the response including a flag and a new expiry time for the + * delegation token. * @throws YarnException * @throws IOException */ @@ -330,9 +350,18 @@ public RenewDelegationTokenResponse renewDelegationToken( /** * Cancel an existing delegation {@link Token}. - * + * + *

+ * The response includes: + *

+ * Note: users have to wait until this flag becomes true to confirm + * the delegation token is canceled. + *

* @param request the delegation token to be cancelled. - * @return an empty response. + * @return the response including a flag. * @throws YarnException * @throws IOException */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java index 495d03e..d605c60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -31,9 +33,27 @@ public abstract class CancelDelegationTokenResponse { @Private @Unstable - public static CancelDelegationTokenResponse newInstance() { + public static CancelDelegationTokenResponse newInstance(boolean isCanceled) { CancelDelegationTokenResponse response = Records.newRecord(CancelDelegationTokenResponse.class); + response.setIsCanceled(isCanceled); return response; } + + /** + * Get the flag which indicates that the process of canceling delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsCanceled(); + + /** + * Set the flag which indicates that the process of canceling delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsCanceled(boolean isCanceled); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java index bb7fcf0..9613619 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java @@ -39,14 +39,32 @@ @Private @Unstable - public static GetDelegationTokenResponse newInstance(Token rmDTToken) { + public static GetDelegationTokenResponse newInstance( + boolean isObtained, Token rmDTToken) { GetDelegationTokenResponse response = Records.newRecord(GetDelegationTokenResponse.class); + response.setIsObtained(isObtained); response.setRMDelegationToken(rmDTToken); return response; } /** + * Get the flag which indicates that the process of getting delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsObtained(); + + /** + * Set the flag which indicates that the process of getting delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsObtained(boolean isObtained); + + /** * The Delegation tokens have a identifier which maps to * {@link AbstractDelegationTokenIdentifier}. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java index f0f7fad..5c449f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -31,18 +33,43 @@ @Private @Unstable - public static RenewDelegationTokenResponse newInstance(long expTime) { + public static RenewDelegationTokenResponse newInstance( + boolean isRenewed, long expTime) { RenewDelegationTokenResponse response = Records.newRecord(RenewDelegationTokenResponse.class); + response.setIsRenewed(isRenewed); response.setNextExpirationTime(expTime); return response; } + /** + * Get the flag which indicates that the process of renewing delegation + * token is completed or not. + */ + @Public + @Stable + public abstract boolean getIsRenewed(); + + /** + * Set the flag which indicates that the process of renewing delegation + * token is completed or not. + */ + @Private + @Unstable + public abstract void setIsRenewed(boolean isRenewed); + + /** + * Get the new expiration time. + */ @Private @Unstable public abstract long getNextExpirationTime(); + /** + * Set the new expiration time + */ @Private @Unstable public abstract void setNextExpirationTime(long expTime); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04ecea6..17625f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -605,12 +605,24 @@ RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50; + /** Interval at which the thread of cleanup delegation token operations runs */ + public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS = + RM_PREFIX + "delegation-token-operation.cleanup-interval-ms"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS = + 60000l; + + /** The max life time of a delegation token operation after its finish before its cleanup */ + public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS = + RM_PREFIX + "delegation-token-operation.cleanup-delay-ms"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS = + 30000l; + /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto index 7ad06c9..5a05b2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto @@ -32,8 +32,8 @@ service ApplicationHistoryProtocolService { rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto); rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto); rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto); - rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto); - rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto); - rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto); + rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.yarn.GetDelegationTokenResponseProto); + rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.yarn.RenewDelegationTokenResponseProto); + rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.yarn.CancelDelegationTokenResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index eda2641..88cc30d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -41,9 +41,9 @@ service ApplicationClientProtocolService { rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto); rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto); rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto); - rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto); - rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto); - rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto); + rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.yarn.GetDelegationTokenResponseProto); + rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.yarn.RenewDelegationTokenResponseProto); + rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.yarn.CancelDelegationTokenResponseProto); rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index eff5cd7..f7674cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -249,6 +249,20 @@ message GetContainerStatusesResponseProto { repeated ContainerExceptionMapProto failed_requests = 2; } +message GetDelegationTokenResponseProto { + optional hadoop.common.TokenProto token = 1; + optional bool is_obtained = 2 [default = false]; +} + +message RenewDelegationTokenResponseProto { + optional uint64 newExpiryTime = 1; + optional bool is_renewed = 2 [default = false]; +} + +message CancelDelegationTokenResponseProto { + optional bool is_canceled = 2 [default = false]; +} + ////////////////////////////////////////////////////// /////// Application_History_Protocol ///////////////// ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 51a7353..9d9a4ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -355,8 +355,25 @@ public Token getRMDelegationToken(Text renewer) GetDelegationTokenRequest rmDTRequest = Records.newRecord(GetDelegationTokenRequest.class); rmDTRequest.setRenewer(renewer.toString()); - GetDelegationTokenResponse response = - rmClient.getDelegationToken(rmDTRequest); + + GetDelegationTokenResponse response; + int pollCount = 0; + while (true) { + response = rmClient.getDelegationToken(rmDTRequest); + if (response.getIsObtained()) { + break; + } + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Get the RM delegation token is not finished"); + } + try { + Thread.sleep(asyncApiPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for getting the delegation token"); + } + } return response.getRMDelegationToken(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 7c34966..adc940f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.client.api.impl; -import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -38,12 +37,15 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -70,6 +72,7 @@ import org.apache.log4j.Logger; import org.junit.Test; + public class TestYarnClient { @Test @@ -93,7 +96,7 @@ public void testClientStop() { @SuppressWarnings("deprecation") @Test (timeout = 30000) - public void testSubmitApplication() { + public void testSubmitApplication() throws Exception { Configuration conf = new Configuration(); conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, 100); // speed up tests @@ -148,6 +151,21 @@ public void testKillApplication() throws Exception { .forceKillApplication(any(KillApplicationRequest.class)); } + @SuppressWarnings("resource") + @Test + public void testGetDelegationToken() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + 100); // speed up tests + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + client.getRMDelegationToken(new Text("test renewer")); + verify(((MockYarnClient) client).rmClient, times(5)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + } + @Test(timeout = 30000) public void testApplicationType() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -265,6 +283,18 @@ public void start() { Assert.fail("Exception is not expected."); } when(mockResponse.getApplicationReport()).thenReturn(mockReport); + + try { + when(rmClient.getDelegationToken( + any(GetDelegationTokenRequest.class))).thenReturn( + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(false, null), + GetDelegationTokenResponse.newInstance(true, null)); + } catch (Exception e) { + Assert.fail(); // shouldn't happen with Mockito + } } public ApplicationClientProtocol getRMClient() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 61068e8..4297106 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -22,16 +22,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; @@ -44,10 +41,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl; @@ -69,14 +66,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; @@ -87,6 +86,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java index 4511cc4..f6aa0e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java @@ -22,11 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; -import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; import org.apache.hadoop.yarn.api.ApplicationHistoryProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -57,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto; @@ -69,6 +67,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java index ec2b2b2..54d74c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java @@ -19,17 +19,21 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @Private @Unstable -public class CancelDelegationTokenResponsePBImpl extends CancelDelegationTokenResponse { +public class CancelDelegationTokenResponsePBImpl + extends CancelDelegationTokenResponse { CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto .getDefaultInstance(); + CancelDelegationTokenResponseProto.Builder builder = null; + boolean viaProto = false; public CancelDelegationTokenResponsePBImpl() { } @@ -39,6 +43,19 @@ public CancelDelegationTokenResponsePBImpl( this.proto = proto; } + @Override + public boolean getIsCanceled() { + CancelDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsCanceled(); + } + + @Override + public void setIsCanceled(boolean isCanceled) { + maybeInitBuilder(); + builder.setIsCanceled(isCanceled); + } + public CancelDelegationTokenResponseProto getProto() { return proto; } @@ -62,4 +79,12 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CancelDelegationTokenResponseProto.newBuilder(proto); + } + viaProto = false; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java index 93f4b5b..4ba9b05 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java @@ -20,12 +20,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; -import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProtoOrBuilder; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -72,6 +72,19 @@ public void setRMDelegationToken(Token appToken) { this.appToken = appToken; } + @Override + public boolean getIsObtained() { + GetDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsObtained(); + } + + @Override + public void setIsObtained(boolean isObtained) { + maybeInitBuilder(); + builder.setIsObtained(isObtained); + } + public GetDelegationTokenResponseProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -128,4 +141,5 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java index 9d20b46..4f86bdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java @@ -19,9 +19,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; -import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProtoOrBuilder; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -89,4 +89,18 @@ public void setNextExpirationTime(long expTime) { maybeInitBuilder(); builder.setNewExpiryTime(expTime); } + + @Override + public boolean getIsRenewed() { + RenewDelegationTokenResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsRenewed(); + } + + @Override + public void setIsRenewed(boolean isRenewed) { + maybeInitBuilder(); + builder.setIsRenewed(isRenewed); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index 418ccb2..1db3cfd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -36,8 +36,11 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; @@ -98,12 +101,22 @@ public static void setSecretManager( public long renew(Token token, Configuration conf) throws IOException, InterruptedException { final ApplicationClientProtocol rmClient = getRmClient(token, conf); + long asyncApiPollIntervalMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); if (rmClient != null) { try { - RenewDelegationTokenRequest request = - Records.newRecord(RenewDelegationTokenRequest.class); - request.setDelegationToken(convertToProtoToken(token)); - return rmClient.renewDelegationToken(request).getNextExpirationTime(); + while (true) { + RenewDelegationTokenRequest request = + Records.newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + RenewDelegationTokenResponse response = + rmClient.renewDelegationToken(request); + if (response.getIsRenewed()) { + return response.getNextExpirationTime(); + } + Thread.sleep(asyncApiPollIntervalMillis); + } } catch (YarnException e) { throw new IOException(e); } finally { @@ -120,12 +133,22 @@ public long renew(Token token, Configuration conf) throws IOException, public void cancel(Token token, Configuration conf) throws IOException, InterruptedException { final ApplicationClientProtocol rmClient = getRmClient(token, conf); + long asyncApiPollIntervalMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); if (rmClient != null) { try { - CancelDelegationTokenRequest request = - Records.newRecord(CancelDelegationTokenRequest.class); - request.setDelegationToken(convertToProtoToken(token)); - rmClient.cancelDelegationToken(request); + while (true) { + CancelDelegationTokenRequest request = + Records.newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + CancelDelegationTokenResponse response = + rmClient.cancelDelegationToken(request); + if (response.getIsCanceled()) { + break; + } + Thread.sleep(asyncApiPollIntervalMillis); + } } catch (YarnException e) { throw new IOException(e); } finally { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5b3ede7..fa137cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -524,6 +524,20 @@ + Interval at which the thread of cleanup delegation token + operations runs + yarn.resourcemanager.delegation-token-operation.cleanup-interval-ms + 60000 + + + + The max life time of a delegation token operation after its + finish before its cleanup + yarn.resourcemanager.delegation-token-operation.cleanup-delay-ms + 30000 + + + Interval for the roll over for the master key used to generate application tokens diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index fdde381..243027f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -101,7 +101,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationState; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -128,7 +129,7 @@ private final RMAppManager rmAppManager; private Server server; - protected RMDelegationTokenSecretManager rmDTSecretManager; + protected RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); InetSocketAddress clientBindAddress; @@ -139,14 +140,14 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler, RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, QueueACLsManager queueACLsManager, - RMDelegationTokenSecretManager rmDTSecretManager) { + RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync) { super(ClientRMService.class.getName()); this.scheduler = scheduler; this.rmContext = rmContext; this.rmAppManager = rmAppManager; this.applicationsACLsManager = applicationACLsManager; this.queueACLsManager = queueACLsManager; - this.rmDTSecretManager = rmDTSecretManager; + this.rmDTSecretManagerAsync = rmDTSecretManagerAsync; } @Override @@ -162,7 +163,7 @@ protected void serviceStart() throws Exception { this.server = rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, - conf, this.rmDTSecretManager, + conf, this.rmDTSecretManagerAsync.getRMDelegationTokenSecretManager(), conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); @@ -659,20 +660,32 @@ public GetDelegationTokenResponse getDelegationToken( if (ugi.getRealUser() != null) { realUser = new Text(ugi.getRealUser().getUserName()); } - RMDelegationTokenIdentifier tokenIdentifier = + RMDelegationTokenIdentifier rmDTIdentifier = new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser); - Token realRMDTtoken = - new Token(tokenIdentifier, - this.rmDTSecretManager); - response.setRMDelegationToken( - BuilderUtils.newDelegationToken( - realRMDTtoken.getIdentifier(), - realRMDTtoken.getKind().toString(), - realRMDTtoken.getPassword(), - realRMDTtoken.getService().toString() - )); - return response; + RMDTOperationState state = + rmDTSecretManagerAsync.createTokenAsync(rmDTIdentifier); + if (state == RMDTOperationState.IN_PROFESS) { + response.setIsObtained(false); + return response; + } else { + Token token = + rmDTSecretManagerAsync.pollToken(rmDTIdentifier); + // in case the operation cleanup thread has cleaned the result before polling + if (token == null) { + response.setIsObtained(false); + return response; + } else { + response.setIsObtained(true); + response.setRMDelegationToken( + BuilderUtils.newDelegationToken( + token.getIdentifier(), + token.getKind().toString(), + token.getPassword(), + token.getService().toString())); + return response; + } + } } catch(IOException io) { throw RPCUtil.getRemoteException(io); } @@ -693,11 +706,25 @@ public RenewDelegationTokenResponse renewDelegationToken( new Text(protoToken.getKind()), new Text(protoToken.getService())); String user = getRenewerForToken(token); - long nextExpTime = rmDTSecretManager.renewToken(token, user); + RMDTOperationState state = + rmDTSecretManagerAsync.renewTokenAsync(token, user); RenewDelegationTokenResponse renewResponse = Records .newRecord(RenewDelegationTokenResponse.class); - renewResponse.setNextExpirationTime(nextExpTime); - return renewResponse; + if (state == RMDTOperationState.IN_PROFESS) { + renewResponse.setIsRenewed(false); + return renewResponse; + } else { + // in case the operation cleanup thread has cleaned the result before polling + Long renewTime = rmDTSecretManagerAsync.pollRenewTime(token); + if (renewTime == null) { + renewResponse.setIsRenewed(false); + return renewResponse; + } else { + renewResponse.setIsRenewed(true); + renewResponse.setNextExpirationTime(renewTime.longValue()); + return renewResponse; + } + } } catch (IOException e) { throw RPCUtil.getRemoteException(e); } @@ -717,8 +744,18 @@ public CancelDelegationTokenResponse cancelDelegationToken( new Text(protoToken.getKind()), new Text(protoToken.getService())); String user = getRenewerForToken(token); - rmDTSecretManager.cancelToken(token, user); - return Records.newRecord(CancelDelegationTokenResponse.class); + RMDTOperationState state = + rmDTSecretManagerAsync.cancelTokenAsync(token, user); + CancelDelegationTokenResponse cancelResponse = + Records.newRecord(CancelDelegationTokenResponse.class); + if (state == RMDTOperationState.IN_PROFESS) { + cancelResponse.setIsCanceled(false); + return cancelResponse; + } else { + rmDTSecretManagerAsync.confirmCancellation(token); + cancelResponse.setIsCanceled(true); + return cancelResponse; + } } catch (IOException e) { throw RPCUtil.getRemoteException(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 79fb5df..019823b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; /** * Context of the ResourceManager. @@ -88,10 +88,10 @@ void setClientRMService(ClientRMService clientRMService); - RMDelegationTokenSecretManager getRMDelegationTokenSecretManager(); + RMDelegationTokenSecretManagerAsync getRMDelegationTokenSecretManagerAsync(); - void setRMDelegationTokenSecretManager( - RMDelegationTokenSecretManager delegationTokenSecretManager); + void setRMDelegationTokenSecretManagerAsync( + RMDelegationTokenSecretManagerAsync delegationTokenSecretManagerAsync); RMApplicationHistoryWriter getRMApplicationHistoryWriter(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 689a091..102ac3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; import com.google.common.annotations.VisibleForTesting; @@ -74,7 +74,7 @@ private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; private AdminService adminService; private ClientRMService clientRMService; - private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private RMDelegationTokenSecretManagerAsync rmDelegationTokenSecretManagerAsync; private ResourceScheduler scheduler; private NodesListManager nodesListManager; private ResourceTrackerService resourceTrackerService; @@ -250,14 +250,16 @@ public void setClientRMService(ClientRMService clientRMService) { } @Override - public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { - return this.rmDelegationTokenSecretManager; + public RMDelegationTokenSecretManagerAsync + getRMDelegationTokenSecretManagerAsync() { + return this.rmDelegationTokenSecretManagerAsync; } @Override - public void setRMDelegationTokenSecretManager( - RMDelegationTokenSecretManager delegationTokenSecretManager) { - this.rmDelegationTokenSecretManager = delegationTokenSecretManager; + public void setRMDelegationTokenSecretManagerAsync( + RMDelegationTokenSecretManagerAsync delegationTokenSecretManagerAsync) { + this.rmDelegationTokenSecretManagerAsync = + delegationTokenSecretManagerAsync; } void setContainerAllocationExpirer( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java index 9fdde65..ed9eab3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java @@ -18,7 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -28,8 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; -import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; public class RMSecretManagerService extends AbstractService { @@ -38,6 +40,7 @@ ClientToAMTokenSecretManagerInRM clientToAMSecretManager; RMContainerTokenSecretManager containerTokenSecretManager; RMDelegationTokenSecretManager rmDTSecretManager; + RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync; RMContextImpl rmContext; @@ -65,11 +68,14 @@ public RMSecretManagerService(Configuration conf, RMContextImpl rmContext) { rmDTSecretManager = createRMDelegationTokenSecretManager(conf, rmContext); - rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager); + rmDTSecretManagerAsync = + new RMDelegationTokenSecretManagerAsync(rmDTSecretManager); + rmContext.setRMDelegationTokenSecretManagerAsync(rmDTSecretManagerAsync); } @Override public void serviceInit(Configuration conf) throws Exception { + rmDTSecretManagerAsync.init(conf); super.serviceInit(conf); } @@ -84,6 +90,7 @@ public void serviceStart() throws Exception { } catch(IOException ie) { throw new YarnRuntimeException("Failed to start secret manager threads", ie); } + rmDTSecretManagerAsync.start(); super.serviceStart(); } @@ -92,6 +99,9 @@ public void serviceStop() throws Exception { if (rmDTSecretManager != null) { rmDTSecretManager.stopThreads(); } + if (rmDTSecretManagerAsync != null) { + rmDTSecretManagerAsync.stop(); + } if (amRmTokenSecretManager != null) { amRmTokenSecretManager.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b0fa8f3..a768856 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -87,7 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; @@ -922,7 +922,7 @@ protected ResourceTrackerService createResourceTrackerService() { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, - getRMDTSecretManager()); + getRMDTSecretManagerAsync()); } protected ApplicationMasterService createApplicationMasterService() { @@ -991,14 +991,15 @@ public AMRMTokenSecretManager getAMRMTokenSecretManager(){ } @Private - public RMDelegationTokenSecretManager getRMDTSecretManager(){ - return this.rmContext.getRMDelegationTokenSecretManager(); + public RMDelegationTokenSecretManagerAsync getRMDTSecretManagerAsync(){ + return this.rmContext.getRMDelegationTokenSecretManagerAsync(); } @Override public void recover(RMState state) throws Exception { // recover RMdelegationTokenSecretManager - getRMDTSecretManager().recover(state); + getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager() + .recover(state); // recover applications rmAppManager.recover(state); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index ce9f7ae..b168610 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -145,7 +145,8 @@ protected void serviceStart() throws Exception { } // enable RM to short-circuit token operations directly to itself RMDelegationTokenIdentifier.Renewer.setSecretManager( - rmContext.getRMDelegationTokenSecretManager(), + rmContext.getRMDelegationTokenSecretManagerAsync() + .getRMDelegationTokenSecretManager(), rmContext.getClientRMService().getBindAddress()); serviceStateLock.writeLock().lock(); isServiceStarted = true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java new file mode 100644 index 0000000..c81df37 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +public class RMDelegationTokenEvent extends + AbstractEvent { + + private RMDelegationTokenIdentifier rmDTIdentifier; + private Token token; + private String user; + + public RMDelegationTokenEvent(RMDelegationTokenEventType type, + RMDelegationTokenIdentifier rmDTIdentifier, + Token token, String user) { + super(type); + this.rmDTIdentifier = rmDTIdentifier; + this.token = token; + this.user = user; + } + + public RMDelegationTokenIdentifier getRMDelegationTokenIdentifier() { + return rmDTIdentifier; + } + + public Token getToken() { + return token; + } + + public String getUser() { + return user; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java new file mode 100644 index 0000000..275d7a8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + + +public enum RMDelegationTokenEventType { + CREATE_RMDT, + CANCEL_RMDT, + RENEW_RMDT +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939de..fd90043 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -75,6 +75,13 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, } @Override + @VisibleForTesting + public synchronized byte[] createPassword( + RMDelegationTokenIdentifier identifier) { + return super.createPassword(identifier); + } + + @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java new file mode 100644 index 0000000..dfc3b39 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java @@ -0,0 +1,448 @@ +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link RMDelegationTokenSecretManagerAsync} handles the RPC requests of + * delegation tokens on a separate thread other than the main thread of + * ResourceManager dispatcher. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RMDelegationTokenSecretManagerAsync extends AbstractService { + + private static final Log LOG = + LogFactory.getLog(RMDelegationTokenSecretManagerAsync.class); + + private AsyncDispatcher dispatcher; + private RMDelegationTokenSecretManager manager; + @VisibleForTesting + ConcurrentMap operations = + new ConcurrentHashMap(); + + private long operationsCleanupInterval; + private long operationsCleanupDelay; + private Thread operationsCleaner; + + public RMDelegationTokenSecretManagerAsync( + RMDelegationTokenSecretManager manager) { + super(RMDelegationTokenSecretManagerAsync.class.getName()); + this.manager = manager; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // create async handler + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.register( + RMDelegationTokenEventType.class, new ForwardingEventHandler()); + dispatcher.setDrainEventsOnStop(); + + // create cache cleaner + operationsCleanupInterval = + conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS); + operationsCleanupDelay = + conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS); + operationsCleaner = new Daemon(new Runnable() { + + @Override + public void run() { + while (true) { + try { + Thread.sleep(operationsCleanupInterval); + } catch (InterruptedException e) { + LOG.warn( + "The thread of cleanup finished delegation token operations is interrupted"); + return; + } + for (RMDTOperationKey key : operations.keySet()) { + RMDTOperationValue value = operations.get(key); + if (value != null && value.state == RMDTOperationState.FINISHED) { + long delay = System.currentTimeMillis() - value.timestamp; + if (delay >= operationsCleanupDelay) { + operations.remove(key); + } + } + } + } + } + }); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + dispatcher.start(); + operationsCleaner.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (dispatcher != null) { + dispatcher.stop(); + } + if (operationsCleaner != null) { + operationsCleaner.interrupt(); + operationsCleaner.join(); + } + super.serviceStop(); + } + + /** + * Non-blocking API + * Generate a new delegation token + */ + @SuppressWarnings("unchecked") + public RMDTOperationState createTokenAsync( + RMDelegationTokenIdentifier rmDTIdentifier) { + RMDTOperationKey key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.GET); + RMDTOperationValue value = + new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null); + value = operations.putIfAbsent(key, value); + if (value == null) { + dispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.CREATE_RMDT, + rmDTIdentifier, null, null)); + return RMDTOperationState.IN_PROFESS; + } else { + return value.state; + } + } + + /** + * Non-blocking API + * Cancel a delegation token + */ + @SuppressWarnings("unchecked") + public RMDTOperationState cancelTokenAsync( + Token token, String canceller) + throws IOException { + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + RMDTOperationKey key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL); + RMDTOperationValue value = + new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null); + value = operations.putIfAbsent(key, value); + if (value == null) { + dispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.CANCEL_RMDT, + rmDTIdentifier, token, canceller)); + return RMDTOperationState.IN_PROFESS; + } else { + return value.state; + } + } + + /** + * Non-blocking API + * Renew a delegation token + */ + @SuppressWarnings("unchecked") + public RMDTOperationState renewTokenAsync( + Token token, + String renewer) throws IOException { + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + RMDTOperationKey key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW); + RMDTOperationValue value = + new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null); + value = operations.putIfAbsent(key, value); + if (value == null) { + dispatcher.getEventHandler().handle( + new RMDelegationTokenEvent(RMDelegationTokenEventType.RENEW_RMDT, + rmDTIdentifier, token, renewer)); + return RMDTOperationState.IN_PROFESS; + } else { + return value.state; + } + } + + /** + * Get the delegation token and remove the getting operation cache from + * {@link RMDelegationTokenSecretManagerAsync} memory + */ + @SuppressWarnings("unchecked") + public Token pollToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws IOException { + Object obj = + pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(), + rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(), + RMDTOperationType.GET)); + if (obj == null) { + return null; + } else if (obj instanceof Token) { + return (Token) obj; + } else { + throw new IOException("The instance of " + obj.getClass().getName() + + " is not expected result"); + } + } + + /** + * Confirm {@link ClientRMService} is aware the the completion of token + * cancellation and remove the cancellation operation cache from + * {@link RMDelegationTokenSecretManagerAsync} memory + */ + public void confirmCancellation(Token token) + throws IOException { + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(), + rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(), + RMDTOperationType.CANCEL)); + } + + /** + * Get the renew time and remove the renewing operation cache from + * {@link RMDelegationTokenSecretManagerAsync} memory + */ + public Long pollRenewTime(Token token) + throws IOException { + RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier(); + Object obj = + pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(), + rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(), + RMDTOperationType.RENEW)); + if (obj == null) { + return null; + } else if (obj instanceof Long) { + return (Long) obj; + } else { + throw new IOException("The instance of " + obj.getClass().getName() + + " is not expected result"); + } + } + + /** + * Get {@link RMDelegationTokenSecretManager} + */ + public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { + return manager; + } + + private Object pollRMDTOperationResult(RMDTOperationKey key) + throws IOException { + RMDTOperationValue value = operations.remove(key); + if (value == null) { + return null; + } else { + if (value.exception == null) { + return value.result; + } else { + throw value.exception; + } + } + } + + private void handleRMDelegationTokenEvent(RMDelegationTokenEvent event) { + RMDelegationTokenIdentifier rmDTIdentifier = + event.getRMDelegationTokenIdentifier(); + RMDTOperationKey key; + RMDTOperationValue value; + Token token; + switch (event.getType()) { + case CREATE_RMDT: + token = new Token( + event.getRMDelegationTokenIdentifier(), manager); + key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.GET); + value = operations.get(key); + if (value != null) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + value.result = token; + } + break; + case CANCEL_RMDT: + try { + token = event.getToken(); + String canceller = event.getUser(); + manager.cancelToken(token, canceller); + key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL); + value = operations.get(key); + if (value != null) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + } + } catch (IOException e) { + LOG.error("Error when cancelling the token.", e); + key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL); + value = operations.get(key); + if (value != null) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + value.exception = e; + } + } + break; + case RENEW_RMDT: + try { + token = event.getToken(); + String renewer = event.getUser(); + long renewTime = manager.renewToken(token, renewer); + key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW); + value = operations.get(key); + if (value != null) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + value.result = new Long(renewTime); + } + } catch (IOException e) { + LOG.error("Error when renewing the token.", e); + key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW); + value = operations.get(key); + if (value != null) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + value.exception = e; + } + } + break; + default: + LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + } + } + + /** + * EventHandler implementation which forward events to + * {@link RMDelegationTokenSecretManagerAsync} This hides the EventHandle + * methods of the store from its public interface + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(RMDelegationTokenEvent event) { + handleRMDelegationTokenEvent(event); + } + } + + @VisibleForTesting + static class RMDTOperationKey { + + private Text owner; + private Text realUser; + private Text renewer; + private RMDTOperationType type; + + public RMDTOperationKey(Text owner, Text realUser, Text renewer, + RMDTOperationType type) { + super(); + this.owner = owner; + this.realUser = realUser; + this.renewer = renewer; + this.type = type; + } + + @Override + public int hashCode() { + // generated by eclipse + final int prime = 31; + int result = 1; + result = prime * result + ((owner == null) ? 0 : owner.hashCode()); + result = prime * result + ((realUser == null) ? 0 : realUser.hashCode()); + result = prime * result + ((renewer == null) ? 0 : renewer.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + // generated by eclipse + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RMDTOperationKey other = (RMDTOperationKey) obj; + if (owner == null) { + if (other.owner != null) + return false; + } else if (!owner.equals(other.owner)) + return false; + if (realUser == null) { + if (other.realUser != null) + return false; + } else if (!realUser.equals(other.realUser)) + return false; + if (renewer == null) { + if (other.renewer != null) + return false; + } else if (!renewer.equals(other.renewer)) + return false; + if (type != other.type) + return false; + return true; + } + + } + + @VisibleForTesting + static class RMDTOperationValue { + + @VisibleForTesting + RMDTOperationState state; + private Object result; + private IOException exception; + @VisibleForTesting + long timestamp; + + public RMDTOperationValue(RMDTOperationState state, Object result, + IOException exception) { + super(); + this.state = state; + this.result = result; + this.exception = exception; + timestamp = 0L; + } + + } + + @VisibleForTesting + static enum RMDTOperationType { + GET, CANCEL, RENEW + } + + public static enum RMDTOperationState { + IN_PROFESS, FINISHED + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 63efe8f..81848b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -361,7 +361,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager, applicationACLsManager, queueACLsManager, - getRMDTSecretManager()) { + getRMDTSecretManagerAsync()) { @Override protected void serviceStart() { // override to not start rpc handler diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java index b400e4f..6d6e391 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java @@ -86,7 +86,7 @@ public void setup() throws InterruptedException, IOException { protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), this.scheduler, this.rmAppManager, this.applicationACLsManager, - this.queueACLsManager, getRMDTSecretManager()); + this.queueACLsManager, getRMDTSecretManagerAsync()); }; @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7c49681..7fdd22b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -41,10 +41,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; -import com.google.common.collect.Sets; import junit.framework.Assert; -import org.apache.commons.lang.math.LongRange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,15 +52,20 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -75,7 +78,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; @@ -96,6 +98,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; @@ -104,6 +107,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestClientRMService { private static final Log LOG = LogFactory.getLog(TestClientRMService.class); @@ -114,6 +119,7 @@ private String appType = "MockApp"; private static RMDelegationTokenSecretManager dtsm; + private static RMDelegationTokenSecretManagerAsync dtsmAsync; private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; @@ -124,6 +130,9 @@ public static void setupSecretManager() throws IOException { when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); dtsm.startThreads(); + dtsmAsync = new RMDelegationTokenSecretManagerAsync(dtsm); + dtsmAsync.init(new YarnConfiguration()); + dtsmAsync.start(); } @AfterClass @@ -131,6 +140,9 @@ public static void teardownSecretManager() { if (dtsm != null) { dtsm.stopThreads(); } + if (dtsmAsync != null) { + dtsmAsync.stop(); + } } @Test @@ -139,7 +151,7 @@ public void testGetClusterNodes() throws Exception { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, - this.getRMDTSecretManager()); + this.getRMDTSecretManagerAsync()); }; }; rm.start(); @@ -326,7 +338,8 @@ public Void run() throws Exception { } private void checkTokenRenewal(UserGroupInformation owner, - UserGroupInformation renewer) throws IOException, YarnException { + UserGroupInformation renewer) + throws IOException, YarnException, InterruptedException { RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier( new Text(owner.getUserName()), new Text(renewer.getUserName()), null); @@ -341,8 +354,84 @@ private void checkTokenRenewal(UserGroupInformation owner, RMContext rmContext = mock(RMContext.class); ClientRMService rmService = new ClientRMService( - rmContext, null, null, null, null, dtsm); - rmService.renewDelegationToken(request); + rmContext, null, null, null, null, dtsmAsync); + while (true) { + if (rmService.renewDelegationToken(request).getIsRenewed()) { + break; + } else { + Thread.sleep(100); + } + } + } + + @SuppressWarnings("resource") + @Test + public void testGetRnewCancelDelegationToken() throws Exception { + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = new ClientRMService( + rmContext, null, null, null, null, dtsmAsync); + + int count = 0; + int maxCount = 10; + GetDelegationTokenResponse getResp = null; + for (; count < maxCount; ++count) { + getResp = owner.doAs( + new PrivilegedExceptionAction() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest request = + GetDelegationTokenRequest.newInstance(owner.getUserName()); + return rmService.getDelegationToken(request); + } + }); + if (getResp.getIsObtained()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); + + final org.apache.hadoop.yarn.api.records.Token token = + getResp.getRMDelegationToken(); + for (count = 0; count < maxCount; ++count) { + RenewDelegationTokenResponse renewResp = + owner.doAs(new PrivilegedExceptionAction() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest request = + RenewDelegationTokenRequest.newInstance(token); + return rmService.renewDelegationToken(request); + } + }); + if (renewResp.getIsRenewed()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); + + for (count = 0; count < maxCount; ++count) { + CancelDelegationTokenResponse cancelResp = + owner.doAs(new PrivilegedExceptionAction() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest request = + CancelDelegationTokenRequest.newInstance(token); + return rmService.cancelDelegationToken(request); + } + }); + if (cancelResp.getIsCanceled()) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertTrue(count > 0); + Assert.assertTrue(count < 10); } @Test (timeout = 30000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index d389c0e..091de58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -82,7 +84,7 @@ public void resetSecretManager() { RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); } - @Test + @Test (timeout = 30000) public void testDelegationToken() throws IOException, InterruptedException { final YarnConfiguration conf = new YarnConfiguration(); @@ -103,9 +105,13 @@ public void testDelegationToken() throws IOException, InterruptedException { LOG.info("Creating DelegationTokenSecretManager with initialInterval: " + initialInterval + ", maxLifetime: " + maxLifetime + ", renewInterval: " + renewInterval); + RMDelegationTokenSecretManagerAsync rmDtSecretManagerAsync = + new RMDelegationTokenSecretManagerAsync(rmDtSecretManager); + rmDtSecretManagerAsync.init(conf); + rmDtSecretManagerAsync.start(); final ClientRMService clientRMService = new ClientRMServiceForTest(conf, - scheduler, rmDtSecretManager); + scheduler, rmDtSecretManagerAsync); clientRMService.init(conf); clientRMService.start(); @@ -225,6 +231,7 @@ public void testDelegationToken() throws IOException, InterruptedException { } finally { rmDtSecretManager.stopThreads(); + rmDtSecretManagerAsync.stop(); // TODO PRECOMMIT Close proxies. if (clientRMWithDT != null) { RPC.stopProxy(clientRMWithDT); @@ -362,8 +369,19 @@ public Server getServer(Class protocol, Object instance, GetDelegationTokenRequest request = Records .newRecord(GetDelegationTokenRequest.class); request.setRenewer(renewerString); - return clientRMService.getDelegationToken(request) - .getRMDelegationToken(); + do { + GetDelegationTokenResponse response = + clientRMService.getDelegationToken(request); + if (response.getIsObtained()) { + return response.getRMDelegationToken(); + } else { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + fail(); + } + } + } while (true); } }); return token; @@ -428,10 +446,10 @@ public ApplicationClientProtocol run() { public ClientRMServiceForTest(Configuration conf, ResourceScheduler scheduler, - RMDelegationTokenSecretManager rmDTSecretManager) { + RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync) { super(mock(RMContext.class), scheduler, mock(RMAppManager.class), new ApplicationACLsManager(conf), new QueueACLsManager(scheduler, - conf), rmDTSecretManager); + conf), rmDTSecretManagerAsync); } // Use a random port unless explicitly specified. @@ -443,8 +461,9 @@ InetSocketAddress getBindAddress(Configuration conf) { @Override protected void serviceStop() throws Exception { - if (rmDTSecretManager != null) { - rmDTSecretManager.stopThreads(); + if (rmDTSecretManagerAsync != null) { + rmDTSecretManagerAsync.getRMDelegationTokenSecretManager().stopThreads(); + rmDTSecretManagerAsync.stop(); } super.serviceStop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index dff9019..dcab9b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -51,6 +51,8 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -59,6 +61,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -1045,7 +1049,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() userText1); Token token1 = new Token(dtId1, - rm1.getRMDTSecretManager()); + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()); SecurityUtil.setTokenService(token1, rmAddr); ts.addToken(userText1, token1); tokenSet.add(token1); @@ -1056,7 +1060,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() userText2); Token token2 = new Token(dtId2, - rm1.getRMDTSecretManager()); + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()); SecurityUtil.setTokenService(token2, rmAddr); ts.addToken(userText2, token2); tokenSet.add(token2); @@ -1205,7 +1209,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 30000) public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set( @@ -1236,8 +1240,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { GetDelegationTokenRequest.newInstance("renewer1"); UserGroupInformation.getCurrentUser().setAuthenticationMethod( AuthMethod.KERBEROS); - GetDelegationTokenResponse response1 = - rm1.getClientRMService().getDelegationToken(request1); + GetDelegationTokenResponse response1; + do { + response1 = rm1.getClientRMService().getDelegationToken(request1); + if (response1.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); org.apache.hadoop.yarn.api.records.Token delegationToken1 = response1.getRMDelegationToken(); Token token1 = @@ -1258,25 +1269,35 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { Assert.assertNotNull(appState); // assert all master keys are saved - Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Set allKeysRM1 = rm1.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getAllMasterKeys(); Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); // assert all tokens are saved Map allTokensRM1 = - rm1.getRMDTSecretManager().getAllTokens(); + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager() + .getAllTokens(); Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); Assert.assertEquals(allTokensRM1, rmDTState); // assert sequence number is saved Assert.assertEquals( - rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager() + .getLatestDTSequenceNumber(), rmState.getRMDTSecretManagerState().getDTSequenceNumber()); // request one more token GetDelegationTokenRequest request2 = GetDelegationTokenRequest.newInstance("renewer2"); - GetDelegationTokenResponse response2 = - rm1.getClientRMService().getDelegationToken(request2); + GetDelegationTokenResponse response2; + do { + response2 = rm1.getClientRMService().getDelegationToken(request2); + if (response2.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); org.apache.hadoop.yarn.api.records.Token delegationToken2 = response2.getRMDelegationToken(); Token token2 = @@ -1284,16 +1305,21 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier(); // cancel token2 - try{ - rm1.getRMDTSecretManager().cancelToken(token2, - UserGroupInformation.getCurrentUser().getUserName()); - } catch(Exception e) { - Assert.fail(); - } + CancelDelegationTokenRequest request3 = + CancelDelegationTokenRequest.newInstance(delegationToken2); + do { + CancelDelegationTokenResponse response3 = + rm1.getClientRMService().cancelDelegationToken(request3); + if (response3.getIsCanceled()) { + break; + } else { + Thread.sleep(200); + } + } while (true); // Assert the token which has the latest delegationTokenSequenceNumber is removed - Assert.assertEquals( - rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), + Assert.assertEquals(rm1.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber(), dtId2.getSequenceNumber()); Assert.assertFalse(rmDTState.containsKey(dtId2)); @@ -1303,29 +1329,40 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // assert master keys and tokens are populated back to DTSecretManager Map allTokensRM2 = - rm2.getRMDTSecretManager().getAllTokens(); + rm2.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager() + .getAllTokens(); Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet()); // rm2 has its own master keys when it starts, we use containsAll here - Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() - .containsAll(allKeysRM1)); + Assert.assertTrue(rm2.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager() .getAllMasterKeys() + .containsAll(allKeysRM1)); // assert sequenceNumber is properly recovered, // even though the token which has max sequenceNumber is not stored - Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), - rm2.getRMDTSecretManager().getLatestDTSequenceNumber()); + Assert.assertEquals(rm1.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber(), + rm2.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber()); // renewDate before renewing Long renewDateBeforeRenew = allTokensRM2.get(dtId1); - try{ - // Sleep for one millisecond to make sure renewDataAfterRenew is greater - Thread.sleep(1); - // renew recovered token - rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); - } catch(Exception e) { - Assert.fail(); - } + // Sleep for one millisecond to make sure renewDataAfterRenew is greater + Thread.sleep(1); + // renew recovered token + RenewDelegationTokenRequest request4 = + RenewDelegationTokenRequest.newInstance(delegationToken1); + do { + RenewDelegationTokenResponse response4 = + rm2.getClientRMService().renewDelegationToken(request4); + if (response4.getIsRenewed()) { + break; + } else { + Thread.sleep(200); + } + } while (true); - allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + allTokensRM2 = rm2.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getAllTokens(); Long renewDateAfterRenew = allTokensRM2.get(dtId1); // assert token is renewed Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); @@ -1335,15 +1372,21 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // assert old token is removed from state store Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); - try{ - rm2.getRMDTSecretManager().cancelToken(token1, - UserGroupInformation.getCurrentUser().getUserName()); - } catch(Exception e) { - Assert.fail(); - } + CancelDelegationTokenRequest request5 = + CancelDelegationTokenRequest.newInstance(delegationToken1); + do { + CancelDelegationTokenResponse response5 = + rm2.getClientRMService().cancelDelegationToken(request5); + if (response5.getIsCanceled()) { + break; + } else { + Thread.sleep(200); + } + } while (true); // assert token is removed from state after its cancelled - allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + allTokensRM2 = rm2.getRMDTSecretManagerAsync() + .getRMDelegationTokenSecretManager().getAllTokens(); Assert.assertFalse(allTokensRM2.containsKey(dtId1)); Assert.assertFalse(rmDTState.containsKey(dtId1)); @@ -1354,7 +1397,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // This is to test submit an application to the new RM with the old delegation // token got from previous RM. - @Test + @Test (timeout = 30000) public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1372,8 +1415,15 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() GetDelegationTokenRequest.newInstance("renewer1"); UserGroupInformation.getCurrentUser().setAuthenticationMethod( AuthMethod.KERBEROS); - GetDelegationTokenResponse response1 = - rm1.getClientRMService().getDelegationToken(request1); + GetDelegationTokenResponse response1; + do { + response1 = rm1.getClientRMService().getDelegationToken(request1); + if (response1.getIsObtained()) { + break; + } else { + Thread.sleep(200); + } + } while (true); Token token1 = ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr); @@ -1762,7 +1812,7 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), - rmAppManager, applicationACLsManager, null, getRMDTSecretManager()){ + rmAppManager, applicationACLsManager, null, getRMDTSecretManagerAsync()){ @Override protected void serviceStart() throws Exception { // do nothing diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index 6a209e7..f1e47a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -168,7 +168,7 @@ public void testClientToAMTokens() throws Exception { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, - getRMDTSecretManager()); + getRMDTSecretManagerAsync()); }; @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 3b5add8..6105a68 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -1,24 +1,26 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.security; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.HashSet; @@ -27,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -43,6 +46,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationKey; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationState; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationType; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationValue; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -84,7 +91,8 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { // the other is created on the first run of // tokenRemoverThread.rollMasterKey() - RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + RMDelegationTokenSecretManager dtSecretManager = + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager(); // assert all master keys are saved Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); Set expiringKeys = new HashSet(); @@ -97,17 +105,24 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { // request to generate a RMDelegationToken GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); when(request.getRenewer()).thenReturn("renewer1"); - GetDelegationTokenResponse response = - rm1.getClientRMService().getDelegationToken(request); - org.apache.hadoop.yarn.api.records.Token delegationToken = - response.getRMDelegationToken(); + org.apache.hadoop.yarn.api.records.Token delegationToken; + do { + GetDelegationTokenResponse response = + rm1.getClientRMService().getDelegationToken(request); + if (response.getIsObtained()) { + delegationToken = response.getRMDelegationToken(); + break; + } else { + Thread.sleep(200); + } + } while (true); Token token1 = ConverterUtils.convertFromYarn(delegationToken, null); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); // wait for the first rollMasterKey while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys - .get() < 1){ + .get() < 1) { Thread.sleep(200); } @@ -120,7 +135,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { // wait for token to expire // rollMasterKey is called every 1 second. while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys - .get() < 6) { + .get() < 6) { Thread.sleep(200); } @@ -139,7 +154,8 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { MockRM rm1 = new MyMockRM(conf, memStore); rm1.start(); - RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + RMDelegationTokenSecretManager dtSecretManager = + rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager(); // assert all master keys are saved Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); @@ -160,6 +176,85 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testRMDelegationTokenSecretManagerAsync() throws Exception { + RMDelegationTokenSecretManager dtSecretManager = + mock(RMDelegationTokenSecretManager.class); + when(dtSecretManager.createPassword(any(RMDelegationTokenIdentifier.class))) + .thenReturn(new byte[0]); + when(dtSecretManager.cancelToken(any(Token.class), any(String.class))) + .thenReturn(null); + when(dtSecretManager.renewToken(any(Token.class), any(String.class))) + .thenReturn(0L); + RMDelegationTokenSecretManagerAsync dtSecretManagerAsync = + new RMDelegationTokenSecretManagerAsync(dtSecretManager); + RMDelegationTokenIdentifier rmDTIdentifier = + new RMDelegationTokenIdentifier(new Text("owner"), + new Text("realUser"), new Text("renewer")); + dtSecretManagerAsync.init(new YarnConfiguration()); + dtSecretManagerAsync.start(); + // verify creating token is handled + dtSecretManagerAsync.createTokenAsync(rmDTIdentifier); + Thread.sleep(100); + verify(dtSecretManager).createPassword(rmDTIdentifier); + // verify renewing token is handled + Token token = mock(Token.class); + when(token.decodeIdentifier()).thenReturn(rmDTIdentifier); + dtSecretManagerAsync.renewTokenAsync(token, "renewer"); + Thread.sleep(100); + verify(dtSecretManager).renewToken(token, "renewer"); + // verify cancelling token is handled + dtSecretManagerAsync.cancelTokenAsync(token, "canceller"); + Thread.sleep(100); + verify(dtSecretManager).cancelToken(token, "canceller"); + dtSecretManagerAsync.stop(); + dtSecretManagerAsync.close(); + } + + @Test + public void testCleanupOutstandingFinishedDTOperations() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + conf.setLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS, + 1000); + conf.setLong( + YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS, + 500); + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + RMDelegationTokenSecretManagerAsync dtSecretManagerAsync = + rm1.getRMDTSecretManagerAsync(); + + for (RMDTOperationType type : RMDTOperationType.values()) { + RMDelegationTokenIdentifier rmDTIdentifier = + new RMDelegationTokenIdentifier( + new Text("owner"), new Text("realUser"), new Text("renew")); + RMDTOperationKey key = new RMDTOperationKey( + rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(), + rmDTIdentifier.getRenewer(), type); + RMDTOperationValue value = + new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null); + dtSecretManagerAsync.operations.put(key, value); + } + Assert.assertEquals(RMDTOperationType.values().length, + dtSecretManagerAsync.operations.size()); + for (RMDTOperationValue value : dtSecretManagerAsync.operations.values()) { + value.state = RMDTOperationState.FINISHED; + value.timestamp = System.currentTimeMillis(); + } + + // wait for cleanup + Thread.sleep(2000); + + Assert.assertEquals(0, dtSecretManagerAsync.operations.size()); + + rm1.stop(); + rm1.close(); + } + class MyMockRM extends TestSecurityMockRM { public MyMockRM(Configuration conf, RMStateStore store) { @@ -172,8 +267,8 @@ protected RMSecretManagerService createRMSecretManagerService() { @Override protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager(Configuration conf, - RMContext rmContext) { + createRMDelegationTokenSecretManager(Configuration conf, + RMContext rmContext) { // KeyUpdateInterval-> 1 seconds // TokenMaxLifetime-> 2 seconds. return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000, @@ -186,14 +281,15 @@ protected RMSecretManagerService createRMSecretManagerService() { public class TestRMDelegationTokenSecretManager extends RMDelegationTokenSecretManager { + public AtomicInteger numUpdatedKeys = new AtomicInteger(0); public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenRemoverScanInterval, RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, - delegationTokenRenewInterval, delegationTokenRemoverScanInterval, - rmContext); + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); } @Override