diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
index aeb20cd..eec4537 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
@@ -104,6 +104,7 @@ public void testClusterGetDelegationToken() throws Exception {
rmDTToken.setKind("Testclusterkind");
rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
rmDTToken.setService("0.0.0.0:8032");
+ getDTResponse.setIsObtained(true);
getDTResponse.setRMDelegationToken(rmDTToken);
final ApplicationClientProtocol cRMProtocol = mock(ApplicationClientProtocol.class);
when(cRMProtocol.getDelegationToken(any(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 0abaafb..7049415 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -298,13 +298,22 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
/**
*
The interface used by clients to get delegation token, enabling the
- * containers to be able to talk to the service using those tokens.
- *
- *
The ResourceManager responds with the delegation
- * {@link Token} that can be used by the client to speak to this
- * service.
+ * containers to be able to talk to the service using those tokens.
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of getting the delegation
+ * token is completed or not.
+ * - The {@link Token} that can be used by the client to speak to this
+ * service.
+ *
+ * Note: users have to wait until this flag becomes true to get the
+ * {@link Token}.
+ *
* @param request request to get a delegation token for the client.
- * @return delegation token that can be used to talk to this service
+ * @return the response including a flag and a delegation token that can be
+ * used to talk to this service
* @throws YarnException
* @throws IOException
*/
@@ -316,9 +325,20 @@ public GetDelegationTokenResponse getDelegationToken(
/**
* Renew an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of renewing the delegation
+ * token is completed or not.
+ * - The new expiry time for the delegation token.
+ *
+ * Note: users have to wait until this flag becomes true to get new expiry
+ * time.
+ *
* @param request the delegation token to be renewed.
- * @return the new expiry time for the delegation token.
+ * @return the response including a flag and a new expiry time for the
+ * delegation token.
* @throws YarnException
* @throws IOException
*/
@@ -330,9 +350,18 @@ public RenewDelegationTokenResponse renewDelegationToken(
/**
* Cancel an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of canceling the delegation
+ * token is completed or not.
+ *
+ * Note: users have to wait until this flag becomes true to confirm
+ * the delegation token is canceled.
+ *
* @param request the delegation token to be cancelled.
- * @return an empty response.
+ * @return the response including a flag.
* @throws YarnException
* @throws IOException
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
index 495d03e..d605c60 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,9 +33,27 @@
public abstract class CancelDelegationTokenResponse {
@Private
@Unstable
- public static CancelDelegationTokenResponse newInstance() {
+ public static CancelDelegationTokenResponse newInstance(boolean isCanceled) {
CancelDelegationTokenResponse response =
Records.newRecord(CancelDelegationTokenResponse.class);
+ response.setIsCanceled(isCanceled);
return response;
}
+
+ /**
+ * Get the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsCanceled();
+
+ /**
+ * Set the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsCanceled(boolean isCanceled);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
index bb7fcf0..9613619 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
@@ -39,14 +39,32 @@
@Private
@Unstable
- public static GetDelegationTokenResponse newInstance(Token rmDTToken) {
+ public static GetDelegationTokenResponse newInstance(
+ boolean isObtained, Token rmDTToken) {
GetDelegationTokenResponse response =
Records.newRecord(GetDelegationTokenResponse.class);
+ response.setIsObtained(isObtained);
response.setRMDelegationToken(rmDTToken);
return response;
}
/**
+ * Get the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsObtained();
+
+ /**
+ * Set the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsObtained(boolean isObtained);
+
+ /**
* The Delegation tokens have a identifier which maps to
* {@link AbstractDelegationTokenIdentifier}.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
index f0f7fad..5c449f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,18 +33,43 @@
@Private
@Unstable
- public static RenewDelegationTokenResponse newInstance(long expTime) {
+ public static RenewDelegationTokenResponse newInstance(
+ boolean isRenewed, long expTime) {
RenewDelegationTokenResponse response =
Records.newRecord(RenewDelegationTokenResponse.class);
+ response.setIsRenewed(isRenewed);
response.setNextExpirationTime(expTime);
return response;
}
+ /**
+ * Get the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsRenewed();
+
+ /**
+ * Set the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsRenewed(boolean isRenewed);
+
+ /**
+ * Get the new expiration time.
+ */
@Private
@Unstable
public abstract long getNextExpirationTime();
+ /**
+ * Set the new expiration time
+ */
@Private
@Unstable
public abstract void setNextExpirationTime(long expTime);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 04ecea6..17625f8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -605,12 +605,24 @@
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
-
+
/** Delegation Token renewer thread count */
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
+ /** Interval at which the thread of cleanup delegation token operations runs */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-interval-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ 60000l;
+
+ /** The max life time of a delegation token operation after its finish before its cleanup */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-delay-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ 30000l;
+
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto
index 7ad06c9..5a05b2e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/application_history_client.proto
@@ -32,8 +32,8 @@ service ApplicationHistoryProtocolService {
rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto);
rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto);
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
- rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
- rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
- rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
+ rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.yarn.GetDelegationTokenResponseProto);
+ rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.yarn.RenewDelegationTokenResponseProto);
+ rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.yarn.CancelDelegationTokenResponseProto);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
index eda2641..88cc30d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
@@ -41,9 +41,9 @@ service ApplicationClientProtocolService {
rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
- rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
- rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
- rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
+ rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.yarn.GetDelegationTokenResponseProto);
+ rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.yarn.RenewDelegationTokenResponseProto);
+ rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.yarn.CancelDelegationTokenResponseProto);
rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index eff5cd7..f7674cd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -249,6 +249,20 @@ message GetContainerStatusesResponseProto {
repeated ContainerExceptionMapProto failed_requests = 2;
}
+message GetDelegationTokenResponseProto {
+ optional hadoop.common.TokenProto token = 1;
+ optional bool is_obtained = 2 [default = false];
+}
+
+message RenewDelegationTokenResponseProto {
+ optional uint64 newExpiryTime = 1;
+ optional bool is_renewed = 2 [default = false];
+}
+
+message CancelDelegationTokenResponseProto {
+ optional bool is_canceled = 2 [default = false];
+}
+
//////////////////////////////////////////////////////
/////// Application_History_Protocol /////////////////
//////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 51a7353..9d9a4ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -355,8 +355,25 @@ public Token getRMDelegationToken(Text renewer)
GetDelegationTokenRequest rmDTRequest =
Records.newRecord(GetDelegationTokenRequest.class);
rmDTRequest.setRenewer(renewer.toString());
- GetDelegationTokenResponse response =
- rmClient.getDelegationToken(rmDTRequest);
+
+ GetDelegationTokenResponse response;
+ int pollCount = 0;
+ while (true) {
+ response = rmClient.getDelegationToken(rmDTRequest);
+ if (response.getIsObtained()) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Get the RM delegation token is not finished");
+ }
+ try {
+ Thread.sleep(asyncApiPollIntervalMillis);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while waiting for getting the delegation token");
+ }
+ }
return response.getRMDelegationToken();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 7c34966..adc940f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.client.api.impl;
-import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -38,12 +37,15 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -70,6 +72,7 @@
import org.apache.log4j.Logger;
import org.junit.Test;
+
public class TestYarnClient {
@Test
@@ -93,7 +96,7 @@ public void testClientStop() {
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
- public void testSubmitApplication() {
+ public void testSubmitApplication() throws Exception {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
100); // speed up tests
@@ -148,6 +151,21 @@ public void testKillApplication() throws Exception {
.forceKillApplication(any(KillApplicationRequest.class));
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testGetDelegationToken() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ 100); // speed up tests
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ client.getRMDelegationToken(new Text("test renewer"));
+ verify(((MockYarnClient) client).rmClient, times(5)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+ }
+
@Test(timeout = 30000)
public void testApplicationType() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -265,6 +283,18 @@ public void start() {
Assert.fail("Exception is not expected.");
}
when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+
+ try {
+ when(rmClient.getDelegationToken(
+ any(GetDelegationTokenRequest.class))).thenReturn(
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(true, null));
+ } catch (Exception e) {
+ Assert.fail(); // shouldn't happen with Mockito
+ }
}
public ApplicationClientProtocol getRMClient() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
index 61068e8..4297106 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
@@ -22,16 +22,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
@@ -44,10 +41,10 @@
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl;
@@ -69,14 +66,16 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
@@ -87,6 +86,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java
index 4511cc4..f6aa0e6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationHistoryProtocolPBServiceImpl.java
@@ -22,11 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
-import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -57,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
@@ -69,6 +67,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
index ec2b2b2..54d74c7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
@@ -19,17 +19,21 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
-public class CancelDelegationTokenResponsePBImpl extends CancelDelegationTokenResponse {
+public class CancelDelegationTokenResponsePBImpl
+ extends CancelDelegationTokenResponse {
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
.getDefaultInstance();
+ CancelDelegationTokenResponseProto.Builder builder = null;
+ boolean viaProto = false;
public CancelDelegationTokenResponsePBImpl() {
}
@@ -39,6 +43,19 @@ public CancelDelegationTokenResponsePBImpl(
this.proto = proto;
}
+ @Override
+ public boolean getIsCanceled() {
+ CancelDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsCanceled();
+ }
+
+ @Override
+ public void setIsCanceled(boolean isCanceled) {
+ maybeInitBuilder();
+ builder.setIsCanceled(isCanceled);
+ }
+
public CancelDelegationTokenResponseProto getProto() {
return proto;
}
@@ -62,4 +79,12 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = CancelDelegationTokenResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
index 93f4b5b..4ba9b05 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
@@ -20,12 +20,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
-import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -72,6 +72,19 @@ public void setRMDelegationToken(Token appToken) {
this.appToken = appToken;
}
+ @Override
+ public boolean getIsObtained() {
+ GetDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsObtained();
+ }
+
+ @Override
+ public void setIsObtained(boolean isObtained) {
+ maybeInitBuilder();
+ builder.setIsObtained(isObtained);
+ }
+
public GetDelegationTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -128,4 +141,5 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
index 9d20b46..4f86bdf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
@@ -19,9 +19,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
-import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -89,4 +89,18 @@ public void setNextExpirationTime(long expTime) {
maybeInitBuilder();
builder.setNewExpiryTime(expTime);
}
+
+ @Override
+ public boolean getIsRenewed() {
+ RenewDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsRenewed();
+ }
+
+ @Override
+ public void setIsRenewed(boolean isRenewed) {
+ maybeInitBuilder();
+ builder.setIsRenewed(isRenewed);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
index 418ccb2..1db3cfd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java
@@ -36,8 +36,11 @@
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@@ -98,12 +101,22 @@ public static void setSecretManager(
public long renew(Token> token, Configuration conf) throws IOException,
InterruptedException {
final ApplicationClientProtocol rmClient = getRmClient(token, conf);
+ long asyncApiPollIntervalMillis =
+ conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
if (rmClient != null) {
try {
- RenewDelegationTokenRequest request =
- Records.newRecord(RenewDelegationTokenRequest.class);
- request.setDelegationToken(convertToProtoToken(token));
- return rmClient.renewDelegationToken(request).getNextExpirationTime();
+ while (true) {
+ RenewDelegationTokenRequest request =
+ Records.newRecord(RenewDelegationTokenRequest.class);
+ request.setDelegationToken(convertToProtoToken(token));
+ RenewDelegationTokenResponse response =
+ rmClient.renewDelegationToken(request);
+ if (response.getIsRenewed()) {
+ return response.getNextExpirationTime();
+ }
+ Thread.sleep(asyncApiPollIntervalMillis);
+ }
} catch (YarnException e) {
throw new IOException(e);
} finally {
@@ -120,12 +133,22 @@ public long renew(Token> token, Configuration conf) throws IOException,
public void cancel(Token> token, Configuration conf) throws IOException,
InterruptedException {
final ApplicationClientProtocol rmClient = getRmClient(token, conf);
+ long asyncApiPollIntervalMillis =
+ conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
if (rmClient != null) {
try {
- CancelDelegationTokenRequest request =
- Records.newRecord(CancelDelegationTokenRequest.class);
- request.setDelegationToken(convertToProtoToken(token));
- rmClient.cancelDelegationToken(request);
+ while (true) {
+ CancelDelegationTokenRequest request =
+ Records.newRecord(CancelDelegationTokenRequest.class);
+ request.setDelegationToken(convertToProtoToken(token));
+ CancelDelegationTokenResponse response =
+ rmClient.cancelDelegationToken(request);
+ if (response.getIsCanceled()) {
+ break;
+ }
+ Thread.sleep(asyncApiPollIntervalMillis);
+ }
} catch (YarnException e) {
throw new IOException(e);
} finally {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5b3ede7..fa137cc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -524,6 +524,20 @@
+ Interval at which the thread of cleanup delegation token
+ operations runs
+ yarn.resourcemanager.delegation-token-operation.cleanup-interval-ms
+ 60000
+
+
+
+ The max life time of a delegation token operation after its
+ finish before its cleanup
+ yarn.resourcemanager.delegation-token-operation.cleanup-delay-ms
+ 30000
+
+
+
Interval for the roll over for the master key used to generate
application tokens
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index fdde381..243027f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -101,7 +101,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationState;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -128,7 +129,7 @@
private final RMAppManager rmAppManager;
private Server server;
- protected RMDelegationTokenSecretManager rmDTSecretManager;
+ protected RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
@@ -139,14 +140,14 @@
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
- RMDelegationTokenSecretManager rmDTSecretManager) {
+ RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
this.rmAppManager = rmAppManager;
this.applicationsACLsManager = applicationACLsManager;
this.queueACLsManager = queueACLsManager;
- this.rmDTSecretManager = rmDTSecretManager;
+ this.rmDTSecretManagerAsync = rmDTSecretManagerAsync;
}
@Override
@@ -162,7 +163,7 @@ protected void serviceStart() throws Exception {
this.server =
rpc.getServer(ApplicationClientProtocol.class, this,
clientBindAddress,
- conf, this.rmDTSecretManager,
+ conf, this.rmDTSecretManagerAsync.getRMDelegationTokenSecretManager(),
conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
@@ -659,20 +660,32 @@ public GetDelegationTokenResponse getDelegationToken(
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}
- RMDelegationTokenIdentifier tokenIdentifier =
+ RMDelegationTokenIdentifier rmDTIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
- Token realRMDTtoken =
- new Token(tokenIdentifier,
- this.rmDTSecretManager);
- response.setRMDelegationToken(
- BuilderUtils.newDelegationToken(
- realRMDTtoken.getIdentifier(),
- realRMDTtoken.getKind().toString(),
- realRMDTtoken.getPassword(),
- realRMDTtoken.getService().toString()
- ));
- return response;
+ RMDTOperationState state =
+ rmDTSecretManagerAsync.createTokenAsync(rmDTIdentifier);
+ if (state == RMDTOperationState.IN_PROFESS) {
+ response.setIsObtained(false);
+ return response;
+ } else {
+ Token token =
+ rmDTSecretManagerAsync.pollToken(rmDTIdentifier);
+ // in case the operation cleanup thread has cleaned the result before polling
+ if (token == null) {
+ response.setIsObtained(false);
+ return response;
+ } else {
+ response.setIsObtained(true);
+ response.setRMDelegationToken(
+ BuilderUtils.newDelegationToken(
+ token.getIdentifier(),
+ token.getKind().toString(),
+ token.getPassword(),
+ token.getService().toString()));
+ return response;
+ }
+ }
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
}
@@ -693,11 +706,25 @@ public RenewDelegationTokenResponse renewDelegationToken(
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = getRenewerForToken(token);
- long nextExpTime = rmDTSecretManager.renewToken(token, user);
+ RMDTOperationState state =
+ rmDTSecretManagerAsync.renewTokenAsync(token, user);
RenewDelegationTokenResponse renewResponse = Records
.newRecord(RenewDelegationTokenResponse.class);
- renewResponse.setNextExpirationTime(nextExpTime);
- return renewResponse;
+ if (state == RMDTOperationState.IN_PROFESS) {
+ renewResponse.setIsRenewed(false);
+ return renewResponse;
+ } else {
+ // in case the operation cleanup thread has cleaned the result before polling
+ Long renewTime = rmDTSecretManagerAsync.pollRenewTime(token);
+ if (renewTime == null) {
+ renewResponse.setIsRenewed(false);
+ return renewResponse;
+ } else {
+ renewResponse.setIsRenewed(true);
+ renewResponse.setNextExpirationTime(renewTime.longValue());
+ return renewResponse;
+ }
+ }
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
@@ -717,8 +744,18 @@ public CancelDelegationTokenResponse cancelDelegationToken(
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = getRenewerForToken(token);
- rmDTSecretManager.cancelToken(token, user);
- return Records.newRecord(CancelDelegationTokenResponse.class);
+ RMDTOperationState state =
+ rmDTSecretManagerAsync.cancelTokenAsync(token, user);
+ CancelDelegationTokenResponse cancelResponse =
+ Records.newRecord(CancelDelegationTokenResponse.class);
+ if (state == RMDTOperationState.IN_PROFESS) {
+ cancelResponse.setIsCanceled(false);
+ return cancelResponse;
+ } else {
+ rmDTSecretManagerAsync.confirmCancellation(token);
+ cancelResponse.setIsCanceled(true);
+ return cancelResponse;
+ }
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 79fb5df..019823b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
/**
* Context of the ResourceManager.
@@ -88,10 +88,10 @@
void setClientRMService(ClientRMService clientRMService);
- RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
+ RMDelegationTokenSecretManagerAsync getRMDelegationTokenSecretManagerAsync();
- void setRMDelegationTokenSecretManager(
- RMDelegationTokenSecretManager delegationTokenSecretManager);
+ void setRMDelegationTokenSecretManagerAsync(
+ RMDelegationTokenSecretManagerAsync delegationTokenSecretManagerAsync);
RMApplicationHistoryWriter getRMApplicationHistoryWriter();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 689a091..102ac3f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -42,7 +42,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
import com.google.common.annotations.VisibleForTesting;
@@ -74,7 +74,7 @@
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private AdminService adminService;
private ClientRMService clientRMService;
- private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
+ private RMDelegationTokenSecretManagerAsync rmDelegationTokenSecretManagerAsync;
private ResourceScheduler scheduler;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
@@ -250,14 +250,16 @@ public void setClientRMService(ClientRMService clientRMService) {
}
@Override
- public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
- return this.rmDelegationTokenSecretManager;
+ public RMDelegationTokenSecretManagerAsync
+ getRMDelegationTokenSecretManagerAsync() {
+ return this.rmDelegationTokenSecretManagerAsync;
}
@Override
- public void setRMDelegationTokenSecretManager(
- RMDelegationTokenSecretManager delegationTokenSecretManager) {
- this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
+ public void setRMDelegationTokenSecretManagerAsync(
+ RMDelegationTokenSecretManagerAsync delegationTokenSecretManagerAsync) {
+ this.rmDelegationTokenSecretManagerAsync =
+ delegationTokenSecretManagerAsync;
}
void setContainerAllocationExpirer(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
index 9fdde65..ed9eab3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -28,8 +29,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
public class RMSecretManagerService extends AbstractService {
@@ -38,6 +40,7 @@
ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
RMContainerTokenSecretManager containerTokenSecretManager;
RMDelegationTokenSecretManager rmDTSecretManager;
+ RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync;
RMContextImpl rmContext;
@@ -65,11 +68,14 @@ public RMSecretManagerService(Configuration conf, RMContextImpl rmContext) {
rmDTSecretManager =
createRMDelegationTokenSecretManager(conf, rmContext);
- rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+ rmDTSecretManagerAsync =
+ new RMDelegationTokenSecretManagerAsync(rmDTSecretManager);
+ rmContext.setRMDelegationTokenSecretManagerAsync(rmDTSecretManagerAsync);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
+ rmDTSecretManagerAsync.init(conf);
super.serviceInit(conf);
}
@@ -84,6 +90,7 @@ public void serviceStart() throws Exception {
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
}
+ rmDTSecretManagerAsync.start();
super.serviceStart();
}
@@ -92,6 +99,9 @@ public void serviceStop() throws Exception {
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
+ if (rmDTSecretManagerAsync != null) {
+ rmDTSecretManagerAsync.stop();
+ }
if (amRmTokenSecretManager != null) {
amRmTokenSecretManager.stop();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b0fa8f3..a768856 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -87,7 +87,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
@@ -922,7 +922,7 @@ protected ResourceTrackerService createResourceTrackerService() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager, this.queueACLsManager,
- getRMDTSecretManager());
+ getRMDTSecretManagerAsync());
}
protected ApplicationMasterService createApplicationMasterService() {
@@ -991,14 +991,15 @@ public AMRMTokenSecretManager getAMRMTokenSecretManager(){
}
@Private
- public RMDelegationTokenSecretManager getRMDTSecretManager(){
- return this.rmContext.getRMDelegationTokenSecretManager();
+ public RMDelegationTokenSecretManagerAsync getRMDTSecretManagerAsync(){
+ return this.rmContext.getRMDelegationTokenSecretManagerAsync();
}
@Override
public void recover(RMState state) throws Exception {
// recover RMdelegationTokenSecretManager
- getRMDTSecretManager().recover(state);
+ getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()
+ .recover(state);
// recover applications
rmAppManager.recover(state);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index ce9f7ae..b168610 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -145,7 +145,8 @@ protected void serviceStart() throws Exception {
}
// enable RM to short-circuit token operations directly to itself
RMDelegationTokenIdentifier.Renewer.setSecretManager(
- rmContext.getRMDelegationTokenSecretManager(),
+ rmContext.getRMDelegationTokenSecretManagerAsync()
+ .getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
serviceStateLock.writeLock().lock();
isServiceStarted = true;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
new file mode 100644
index 0000000..c81df37
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMDelegationTokenEvent extends
+ AbstractEvent {
+
+ private RMDelegationTokenIdentifier rmDTIdentifier;
+ private Token token;
+ private String user;
+
+ public RMDelegationTokenEvent(RMDelegationTokenEventType type,
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Token token, String user) {
+ super(type);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.token = token;
+ this.user = user;
+ }
+
+ public RMDelegationTokenIdentifier getRMDelegationTokenIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Token getToken() {
+ return token;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
new file mode 100644
index 0000000..275d7a8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+
+public enum RMDelegationTokenEventType {
+ CREATE_RMDT,
+ CANCEL_RMDT,
+ RENEW_RMDT
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
index 23939de..fd90043 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
@@ -75,6 +75,13 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
}
@Override
+ @VisibleForTesting
+ public synchronized byte[] createPassword(
+ RMDelegationTokenIdentifier identifier) {
+ return super.createPassword(identifier);
+ }
+
+ @Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java
new file mode 100644
index 0000000..dfc3b39
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManagerAsync.java
@@ -0,0 +1,448 @@
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link RMDelegationTokenSecretManagerAsync} handles the RPC requests of
+ * delegation tokens on a separate thread other than the main thread of
+ * ResourceManager dispatcher.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RMDelegationTokenSecretManagerAsync extends AbstractService {
+
+ private static final Log LOG =
+ LogFactory.getLog(RMDelegationTokenSecretManagerAsync.class);
+
+ private AsyncDispatcher dispatcher;
+ private RMDelegationTokenSecretManager manager;
+ @VisibleForTesting
+ ConcurrentMap operations =
+ new ConcurrentHashMap();
+
+ private long operationsCleanupInterval;
+ private long operationsCleanupDelay;
+ private Thread operationsCleaner;
+
+ public RMDelegationTokenSecretManagerAsync(
+ RMDelegationTokenSecretManager manager) {
+ super(RMDelegationTokenSecretManagerAsync.class.getName());
+ this.manager = manager;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ // create async handler
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.register(
+ RMDelegationTokenEventType.class, new ForwardingEventHandler());
+ dispatcher.setDrainEventsOnStop();
+
+ // create cache cleaner
+ operationsCleanupInterval =
+ conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS);
+ operationsCleanupDelay =
+ conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS);
+ operationsCleaner = new Daemon(new Runnable() {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(operationsCleanupInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "The thread of cleanup finished delegation token operations is interrupted");
+ return;
+ }
+ for (RMDTOperationKey key : operations.keySet()) {
+ RMDTOperationValue value = operations.get(key);
+ if (value != null && value.state == RMDTOperationState.FINISHED) {
+ long delay = System.currentTimeMillis() - value.timestamp;
+ if (delay >= operationsCleanupDelay) {
+ operations.remove(key);
+ }
+ }
+ }
+ }
+ }
+ });
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ dispatcher.start();
+ operationsCleaner.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ if (operationsCleaner != null) {
+ operationsCleaner.interrupt();
+ operationsCleaner.join();
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Non-blocking API
+ * Generate a new delegation token
+ */
+ @SuppressWarnings("unchecked")
+ public RMDTOperationState createTokenAsync(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ RMDTOperationKey key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.GET);
+ RMDTOperationValue value =
+ new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null);
+ value = operations.putIfAbsent(key, value);
+ if (value == null) {
+ dispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.CREATE_RMDT,
+ rmDTIdentifier, null, null));
+ return RMDTOperationState.IN_PROFESS;
+ } else {
+ return value.state;
+ }
+ }
+
+ /**
+ * Non-blocking API
+ * Cancel a delegation token
+ */
+ @SuppressWarnings("unchecked")
+ public RMDTOperationState cancelTokenAsync(
+ Token token, String canceller)
+ throws IOException {
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ RMDTOperationKey key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL);
+ RMDTOperationValue value =
+ new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null);
+ value = operations.putIfAbsent(key, value);
+ if (value == null) {
+ dispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.CANCEL_RMDT,
+ rmDTIdentifier, token, canceller));
+ return RMDTOperationState.IN_PROFESS;
+ } else {
+ return value.state;
+ }
+ }
+
+ /**
+ * Non-blocking API
+ * Renew a delegation token
+ */
+ @SuppressWarnings("unchecked")
+ public RMDTOperationState renewTokenAsync(
+ Token token,
+ String renewer) throws IOException {
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ RMDTOperationKey key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW);
+ RMDTOperationValue value =
+ new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null);
+ value = operations.putIfAbsent(key, value);
+ if (value == null) {
+ dispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.RENEW_RMDT,
+ rmDTIdentifier, token, renewer));
+ return RMDTOperationState.IN_PROFESS;
+ } else {
+ return value.state;
+ }
+ }
+
+ /**
+ * Get the delegation token and remove the getting operation cache from
+ * {@link RMDelegationTokenSecretManagerAsync} memory
+ */
+ @SuppressWarnings("unchecked")
+ public Token pollToken(
+ RMDelegationTokenIdentifier rmDTIdentifier) throws IOException {
+ Object obj =
+ pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(),
+ rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(),
+ RMDTOperationType.GET));
+ if (obj == null) {
+ return null;
+ } else if (obj instanceof Token>) {
+ return (Token) obj;
+ } else {
+ throw new IOException("The instance of " + obj.getClass().getName()
+ + " is not expected result");
+ }
+ }
+
+ /**
+ * Confirm {@link ClientRMService} is aware the the completion of token
+ * cancellation and remove the cancellation operation cache from
+ * {@link RMDelegationTokenSecretManagerAsync} memory
+ */
+ public void confirmCancellation(Token token)
+ throws IOException {
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(),
+ rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(),
+ RMDTOperationType.CANCEL));
+ }
+
+ /**
+ * Get the renew time and remove the renewing operation cache from
+ * {@link RMDelegationTokenSecretManagerAsync} memory
+ */
+ public Long pollRenewTime(Token token)
+ throws IOException {
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ Object obj =
+ pollRMDTOperationResult(new RMDTOperationKey(rmDTIdentifier.getOwner(),
+ rmDTIdentifier.getRealUser(), rmDTIdentifier.getRenewer(),
+ RMDTOperationType.RENEW));
+ if (obj == null) {
+ return null;
+ } else if (obj instanceof Long) {
+ return (Long) obj;
+ } else {
+ throw new IOException("The instance of " + obj.getClass().getName()
+ + " is not expected result");
+ }
+ }
+
+ /**
+ * Get {@link RMDelegationTokenSecretManager}
+ */
+ public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
+ return manager;
+ }
+
+ private Object pollRMDTOperationResult(RMDTOperationKey key)
+ throws IOException {
+ RMDTOperationValue value = operations.remove(key);
+ if (value == null) {
+ return null;
+ } else {
+ if (value.exception == null) {
+ return value.result;
+ } else {
+ throw value.exception;
+ }
+ }
+ }
+
+ private void handleRMDelegationTokenEvent(RMDelegationTokenEvent event) {
+ RMDelegationTokenIdentifier rmDTIdentifier =
+ event.getRMDelegationTokenIdentifier();
+ RMDTOperationKey key;
+ RMDTOperationValue value;
+ Token token;
+ switch (event.getType()) {
+ case CREATE_RMDT:
+ token = new Token(
+ event.getRMDelegationTokenIdentifier(), manager);
+ key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.GET);
+ value = operations.get(key);
+ if (value != null) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ value.result = token;
+ }
+ break;
+ case CANCEL_RMDT:
+ try {
+ token = event.getToken();
+ String canceller = event.getUser();
+ manager.cancelToken(token, canceller);
+ key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL);
+ value = operations.get(key);
+ if (value != null) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ }
+ } catch (IOException e) {
+ LOG.error("Error when cancelling the token.", e);
+ key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.CANCEL);
+ value = operations.get(key);
+ if (value != null) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ value.exception = e;
+ }
+ }
+ break;
+ case RENEW_RMDT:
+ try {
+ token = event.getToken();
+ String renewer = event.getUser();
+ long renewTime = manager.renewToken(token, renewer);
+ key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW);
+ value = operations.get(key);
+ if (value != null) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ value.result = new Long(renewTime);
+ }
+ } catch (IOException e) {
+ LOG.error("Error when renewing the token.", e);
+ key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), RMDTOperationType.RENEW);
+ value = operations.get(key);
+ if (value != null) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ value.exception = e;
+ }
+ }
+ break;
+ default:
+ LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
+ }
+ }
+
+ /**
+ * EventHandler implementation which forward events to
+ * {@link RMDelegationTokenSecretManagerAsync} This hides the EventHandle
+ * methods of the store from its public interface
+ */
+ private final class ForwardingEventHandler
+ implements EventHandler {
+
+ @Override
+ public void handle(RMDelegationTokenEvent event) {
+ handleRMDelegationTokenEvent(event);
+ }
+ }
+
+ @VisibleForTesting
+ static class RMDTOperationKey {
+
+ private Text owner;
+ private Text realUser;
+ private Text renewer;
+ private RMDTOperationType type;
+
+ public RMDTOperationKey(Text owner, Text realUser, Text renewer,
+ RMDTOperationType type) {
+ super();
+ this.owner = owner;
+ this.realUser = realUser;
+ this.renewer = renewer;
+ this.type = type;
+ }
+
+ @Override
+ public int hashCode() {
+ // generated by eclipse
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((owner == null) ? 0 : owner.hashCode());
+ result = prime * result + ((realUser == null) ? 0 : realUser.hashCode());
+ result = prime * result + ((renewer == null) ? 0 : renewer.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // generated by eclipse
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ RMDTOperationKey other = (RMDTOperationKey) obj;
+ if (owner == null) {
+ if (other.owner != null)
+ return false;
+ } else if (!owner.equals(other.owner))
+ return false;
+ if (realUser == null) {
+ if (other.realUser != null)
+ return false;
+ } else if (!realUser.equals(other.realUser))
+ return false;
+ if (renewer == null) {
+ if (other.renewer != null)
+ return false;
+ } else if (!renewer.equals(other.renewer))
+ return false;
+ if (type != other.type)
+ return false;
+ return true;
+ }
+
+ }
+
+ @VisibleForTesting
+ static class RMDTOperationValue {
+
+ @VisibleForTesting
+ RMDTOperationState state;
+ private Object result;
+ private IOException exception;
+ @VisibleForTesting
+ long timestamp;
+
+ public RMDTOperationValue(RMDTOperationState state, Object result,
+ IOException exception) {
+ super();
+ this.state = state;
+ this.result = result;
+ this.exception = exception;
+ timestamp = 0L;
+ }
+
+ }
+
+ @VisibleForTesting
+ static enum RMDTOperationType {
+ GET, CANCEL, RENEW
+ }
+
+ public static enum RMDTOperationState {
+ IN_PROFESS, FINISHED
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 63efe8f..81848b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -361,7 +361,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, queueACLsManager,
- getRMDTSecretManager()) {
+ getRMDTSecretManagerAsync()) {
@Override
protected void serviceStart() {
// override to not start rpc handler
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java
index b400e4f..6d6e391 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java
@@ -86,7 +86,7 @@ public void setup() throws InterruptedException, IOException {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager,
- this.queueACLsManager, getRMDTSecretManager());
+ this.queueACLsManager, getRMDTSecretManagerAsync());
};
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 7c49681..7fdd22b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -41,10 +41,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
-import com.google.common.collect.Sets;
import junit.framework.Assert;
-import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -54,15 +52,20 @@
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -75,7 +78,6 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
@@ -96,6 +98,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -104,6 +107,8 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
public class TestClientRMService {
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
@@ -114,6 +119,7 @@
private String appType = "MockApp";
private static RMDelegationTokenSecretManager dtsm;
+ private static RMDelegationTokenSecretManagerAsync dtsmAsync;
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
@@ -124,6 +130,9 @@ public static void setupSecretManager() throws IOException {
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
dtsm.startThreads();
+ dtsmAsync = new RMDelegationTokenSecretManagerAsync(dtsm);
+ dtsmAsync.init(new YarnConfiguration());
+ dtsmAsync.start();
}
@AfterClass
@@ -131,6 +140,9 @@ public static void teardownSecretManager() {
if (dtsm != null) {
dtsm.stopThreads();
}
+ if (dtsmAsync != null) {
+ dtsmAsync.stop();
+ }
}
@Test
@@ -139,7 +151,7 @@ public void testGetClusterNodes() throws Exception {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
- this.getRMDTSecretManager());
+ this.getRMDTSecretManagerAsync());
};
};
rm.start();
@@ -326,7 +338,8 @@ public Void run() throws Exception {
}
private void checkTokenRenewal(UserGroupInformation owner,
- UserGroupInformation renewer) throws IOException, YarnException {
+ UserGroupInformation renewer)
+ throws IOException, YarnException, InterruptedException {
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(
new Text(owner.getUserName()), new Text(renewer.getUserName()), null);
@@ -341,8 +354,84 @@ private void checkTokenRenewal(UserGroupInformation owner,
RMContext rmContext = mock(RMContext.class);
ClientRMService rmService = new ClientRMService(
- rmContext, null, null, null, null, dtsm);
- rmService.renewDelegationToken(request);
+ rmContext, null, null, null, null, dtsmAsync);
+ while (true) {
+ if (rmService.renewDelegationToken(request).getIsRenewed()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testGetRnewCancelDelegationToken() throws Exception {
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService = new ClientRMService(
+ rmContext, null, null, null, null, dtsmAsync);
+
+ int count = 0;
+ int maxCount = 10;
+ GetDelegationTokenResponse getResp = null;
+ for (; count < maxCount; ++count) {
+ getResp = owner.doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public GetDelegationTokenResponse run() throws Exception {
+ GetDelegationTokenRequest request =
+ GetDelegationTokenRequest.newInstance(owner.getUserName());
+ return rmService.getDelegationToken(request);
+ }
+ });
+ if (getResp.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
+
+ final org.apache.hadoop.yarn.api.records.Token token =
+ getResp.getRMDelegationToken();
+ for (count = 0; count < maxCount; ++count) {
+ RenewDelegationTokenResponse renewResp =
+ owner.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RenewDelegationTokenResponse run() throws Exception {
+ RenewDelegationTokenRequest request =
+ RenewDelegationTokenRequest.newInstance(token);
+ return rmService.renewDelegationToken(request);
+ }
+ });
+ if (renewResp.getIsRenewed()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
+
+ for (count = 0; count < maxCount; ++count) {
+ CancelDelegationTokenResponse cancelResp =
+ owner.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public CancelDelegationTokenResponse run() throws Exception {
+ CancelDelegationTokenRequest request =
+ CancelDelegationTokenRequest.newInstance(token);
+ return rmService.cancelDelegationToken(request);
+ }
+ });
+ if (cancelResp.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
}
@Test (timeout = 30000)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
index d389c0e..091de58 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -65,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -82,7 +84,7 @@ public void resetSecretManager() {
RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
}
- @Test
+ @Test (timeout = 30000)
public void testDelegationToken() throws IOException, InterruptedException {
final YarnConfiguration conf = new YarnConfiguration();
@@ -103,9 +105,13 @@ public void testDelegationToken() throws IOException, InterruptedException {
LOG.info("Creating DelegationTokenSecretManager with initialInterval: "
+ initialInterval + ", maxLifetime: " + maxLifetime
+ ", renewInterval: " + renewInterval);
+ RMDelegationTokenSecretManagerAsync rmDtSecretManagerAsync =
+ new RMDelegationTokenSecretManagerAsync(rmDtSecretManager);
+ rmDtSecretManagerAsync.init(conf);
+ rmDtSecretManagerAsync.start();
final ClientRMService clientRMService = new ClientRMServiceForTest(conf,
- scheduler, rmDtSecretManager);
+ scheduler, rmDtSecretManagerAsync);
clientRMService.init(conf);
clientRMService.start();
@@ -225,6 +231,7 @@ public void testDelegationToken() throws IOException, InterruptedException {
} finally {
rmDtSecretManager.stopThreads();
+ rmDtSecretManagerAsync.stop();
// TODO PRECOMMIT Close proxies.
if (clientRMWithDT != null) {
RPC.stopProxy(clientRMWithDT);
@@ -362,8 +369,19 @@ public Server getServer(Class protocol, Object instance,
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
- return clientRMService.getDelegationToken(request)
- .getRMDelegationToken();
+ do {
+ GetDelegationTokenResponse response =
+ clientRMService.getDelegationToken(request);
+ if (response.getIsObtained()) {
+ return response.getRMDelegationToken();
+ } else {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+ } while (true);
}
});
return token;
@@ -428,10 +446,10 @@ public ApplicationClientProtocol run() {
public ClientRMServiceForTest(Configuration conf,
ResourceScheduler scheduler,
- RMDelegationTokenSecretManager rmDTSecretManager) {
+ RMDelegationTokenSecretManagerAsync rmDTSecretManagerAsync) {
super(mock(RMContext.class), scheduler, mock(RMAppManager.class),
new ApplicationACLsManager(conf), new QueueACLsManager(scheduler,
- conf), rmDTSecretManager);
+ conf), rmDTSecretManagerAsync);
}
// Use a random port unless explicitly specified.
@@ -443,8 +461,9 @@ InetSocketAddress getBindAddress(Configuration conf) {
@Override
protected void serviceStop() throws Exception {
- if (rmDTSecretManager != null) {
- rmDTSecretManager.stopThreads();
+ if (rmDTSecretManagerAsync != null) {
+ rmDTSecretManagerAsync.getRMDelegationTokenSecretManager().stopThreads();
+ rmDTSecretManagerAsync.stop();
}
super.serviceStop();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index dff9019..dcab9b6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -51,6 +51,8 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -59,6 +61,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1045,7 +1049,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
userText1);
Token token1 =
new Token(dtId1,
- rm1.getRMDTSecretManager());
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager());
SecurityUtil.setTokenService(token1, rmAddr);
ts.addToken(userText1, token1);
tokenSet.add(token1);
@@ -1056,7 +1060,7 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
userText2);
Token token2 =
new Token(dtId2,
- rm1.getRMDTSecretManager());
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager());
SecurityUtil.setTokenService(token2, rmAddr);
ts.addToken(userText2, token2);
tokenSet.add(token2);
@@ -1205,7 +1209,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
rm2.stop();
}
- @Test
+ @Test (timeout = 30000)
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(
@@ -1236,8 +1240,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
GetDelegationTokenRequest.newInstance("renewer1");
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
AuthMethod.KERBEROS);
- GetDelegationTokenResponse response1 =
- rm1.getClientRMService().getDelegationToken(request1);
+ GetDelegationTokenResponse response1;
+ do {
+ response1 = rm1.getClientRMService().getDelegationToken(request1);
+ if (response1.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
response1.getRMDelegationToken();
Token token1 =
@@ -1258,25 +1269,35 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
Assert.assertNotNull(appState);
// assert all master keys are saved
- Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys();
+ Set allKeysRM1 = rm1.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getAllMasterKeys();
Assert.assertEquals(allKeysRM1, rmDTMasterKeyState);
// assert all tokens are saved
Map allTokensRM1 =
- rm1.getRMDTSecretManager().getAllTokens();
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()
+ .getAllTokens();
Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
Assert.assertEquals(allTokensRM1, rmDTState);
// assert sequence number is saved
Assert.assertEquals(
- rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()
+ .getLatestDTSequenceNumber(),
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
// request one more token
GetDelegationTokenRequest request2 =
GetDelegationTokenRequest.newInstance("renewer2");
- GetDelegationTokenResponse response2 =
- rm1.getClientRMService().getDelegationToken(request2);
+ GetDelegationTokenResponse response2;
+ do {
+ response2 = rm1.getClientRMService().getDelegationToken(request2);
+ if (response2.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
org.apache.hadoop.yarn.api.records.Token delegationToken2 =
response2.getRMDelegationToken();
Token token2 =
@@ -1284,16 +1305,21 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
// cancel token2
- try{
- rm1.getRMDTSecretManager().cancelToken(token2,
- UserGroupInformation.getCurrentUser().getUserName());
- } catch(Exception e) {
- Assert.fail();
- }
+ CancelDelegationTokenRequest request3 =
+ CancelDelegationTokenRequest.newInstance(delegationToken2);
+ do {
+ CancelDelegationTokenResponse response3 =
+ rm1.getClientRMService().cancelDelegationToken(request3);
+ if (response3.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
// Assert the token which has the latest delegationTokenSequenceNumber is removed
- Assert.assertEquals(
- rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+ Assert.assertEquals(rm1.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber(),
dtId2.getSequenceNumber());
Assert.assertFalse(rmDTState.containsKey(dtId2));
@@ -1303,29 +1329,40 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// assert master keys and tokens are populated back to DTSecretManager
Map allTokensRM2 =
- rm2.getRMDTSecretManager().getAllTokens();
+ rm2.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager()
+ .getAllTokens();
Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet());
// rm2 has its own master keys when it starts, we use containsAll here
- Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
- .containsAll(allKeysRM1));
+ Assert.assertTrue(rm2.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager() .getAllMasterKeys()
+ .containsAll(allKeysRM1));
// assert sequenceNumber is properly recovered,
// even though the token which has max sequenceNumber is not stored
- Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
- rm2.getRMDTSecretManager().getLatestDTSequenceNumber());
+ Assert.assertEquals(rm1.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber(),
+ rm2.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getLatestDTSequenceNumber());
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
- try{
- // Sleep for one millisecond to make sure renewDataAfterRenew is greater
- Thread.sleep(1);
- // renew recovered token
- rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
- } catch(Exception e) {
- Assert.fail();
- }
+ // Sleep for one millisecond to make sure renewDataAfterRenew is greater
+ Thread.sleep(1);
+ // renew recovered token
+ RenewDelegationTokenRequest request4 =
+ RenewDelegationTokenRequest.newInstance(delegationToken1);
+ do {
+ RenewDelegationTokenResponse response4 =
+ rm2.getClientRMService().renewDelegationToken(request4);
+ if (response4.getIsRenewed()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
- allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+ allTokensRM2 = rm2.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getAllTokens();
Long renewDateAfterRenew = allTokensRM2.get(dtId1);
// assert token is renewed
Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew);
@@ -1335,15 +1372,21 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// assert old token is removed from state store
Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
- try{
- rm2.getRMDTSecretManager().cancelToken(token1,
- UserGroupInformation.getCurrentUser().getUserName());
- } catch(Exception e) {
- Assert.fail();
- }
+ CancelDelegationTokenRequest request5 =
+ CancelDelegationTokenRequest.newInstance(delegationToken1);
+ do {
+ CancelDelegationTokenResponse response5 =
+ rm2.getClientRMService().cancelDelegationToken(request5);
+ if (response5.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
// assert token is removed from state after its cancelled
- allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+ allTokensRM2 = rm2.getRMDTSecretManagerAsync()
+ .getRMDelegationTokenSecretManager().getAllTokens();
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
Assert.assertFalse(rmDTState.containsKey(dtId1));
@@ -1354,7 +1397,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// This is to test submit an application to the new RM with the old delegation
// token got from previous RM.
- @Test
+ @Test (timeout = 30000)
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -1372,8 +1415,15 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
GetDelegationTokenRequest.newInstance("renewer1");
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
AuthMethod.KERBEROS);
- GetDelegationTokenResponse response1 =
- rm1.getClientRMService().getDelegationToken(request1);
+ GetDelegationTokenResponse response1;
+ do {
+ response1 = rm1.getClientRMService().getDelegationToken(request1);
+ if (response1.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
Token token1 =
ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
@@ -1762,7 +1812,7 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) {
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
- rmAppManager, applicationACLsManager, null, getRMDTSecretManager()){
+ rmAppManager, applicationACLsManager, null, getRMDTSecretManagerAsync()){
@Override
protected void serviceStart() throws Exception {
// do nothing
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java
index 6a209e7..f1e47a3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java
@@ -168,7 +168,7 @@ public void testClientToAMTokens() throws Exception {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
- getRMDTSecretManager());
+ getRMDTSecretManagerAsync());
};
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
index 3b5add8..6105a68 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
@@ -1,24 +1,26 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.security;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
@@ -27,6 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -43,6 +46,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationKey;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManagerAsync.RMDTOperationValue;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -84,7 +91,8 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
// the other is created on the first run of
// tokenRemoverThread.rollMasterKey()
- RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+ RMDelegationTokenSecretManager dtSecretManager =
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
Set expiringKeys = new HashSet();
@@ -97,17 +105,24 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
// request to generate a RMDelegationToken
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
- GetDelegationTokenResponse response =
- rm1.getClientRMService().getDelegationToken(request);
- org.apache.hadoop.yarn.api.records.Token delegationToken =
- response.getRMDelegationToken();
+ org.apache.hadoop.yarn.api.records.Token delegationToken;
+ do {
+ GetDelegationTokenResponse response =
+ rm1.getClientRMService().getDelegationToken(request);
+ if (response.getIsObtained()) {
+ delegationToken = response.getRMDelegationToken();
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
Token token1 =
ConverterUtils.convertFromYarn(delegationToken, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
// wait for the first rollMasterKey
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
- .get() < 1){
+ .get() < 1) {
Thread.sleep(200);
}
@@ -120,7 +135,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
// wait for token to expire
// rollMasterKey is called every 1 second.
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
- .get() < 6) {
+ .get() < 6) {
Thread.sleep(200);
}
@@ -139,7 +154,8 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
- RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+ RMDelegationTokenSecretManager dtSecretManager =
+ rm1.getRMDTSecretManagerAsync().getRMDelegationTokenSecretManager();
// assert all master keys are saved
Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
@@ -160,6 +176,85 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRMDelegationTokenSecretManagerAsync() throws Exception {
+ RMDelegationTokenSecretManager dtSecretManager =
+ mock(RMDelegationTokenSecretManager.class);
+ when(dtSecretManager.createPassword(any(RMDelegationTokenIdentifier.class)))
+ .thenReturn(new byte[0]);
+ when(dtSecretManager.cancelToken(any(Token.class), any(String.class)))
+ .thenReturn(null);
+ when(dtSecretManager.renewToken(any(Token.class), any(String.class)))
+ .thenReturn(0L);
+ RMDelegationTokenSecretManagerAsync dtSecretManagerAsync =
+ new RMDelegationTokenSecretManagerAsync(dtSecretManager);
+ RMDelegationTokenIdentifier rmDTIdentifier =
+ new RMDelegationTokenIdentifier(new Text("owner"),
+ new Text("realUser"), new Text("renewer"));
+ dtSecretManagerAsync.init(new YarnConfiguration());
+ dtSecretManagerAsync.start();
+ // verify creating token is handled
+ dtSecretManagerAsync.createTokenAsync(rmDTIdentifier);
+ Thread.sleep(100);
+ verify(dtSecretManager).createPassword(rmDTIdentifier);
+ // verify renewing token is handled
+ Token token = mock(Token.class);
+ when(token.decodeIdentifier()).thenReturn(rmDTIdentifier);
+ dtSecretManagerAsync.renewTokenAsync(token, "renewer");
+ Thread.sleep(100);
+ verify(dtSecretManager).renewToken(token, "renewer");
+ // verify cancelling token is handled
+ dtSecretManagerAsync.cancelTokenAsync(token, "canceller");
+ Thread.sleep(100);
+ verify(dtSecretManager).cancelToken(token, "canceller");
+ dtSecretManagerAsync.stop();
+ dtSecretManagerAsync.close();
+ }
+
+ @Test
+ public void testCleanupOutstandingFinishedDTOperations() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ conf.setLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS,
+ 1000);
+ conf.setLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS,
+ 500);
+ MockRM rm1 = new MyMockRM(conf, memStore);
+ rm1.start();
+ RMDelegationTokenSecretManagerAsync dtSecretManagerAsync =
+ rm1.getRMDTSecretManagerAsync();
+
+ for (RMDTOperationType type : RMDTOperationType.values()) {
+ RMDelegationTokenIdentifier rmDTIdentifier =
+ new RMDelegationTokenIdentifier(
+ new Text("owner"), new Text("realUser"), new Text("renew"));
+ RMDTOperationKey key = new RMDTOperationKey(
+ rmDTIdentifier.getOwner(), rmDTIdentifier.getRealUser(),
+ rmDTIdentifier.getRenewer(), type);
+ RMDTOperationValue value =
+ new RMDTOperationValue(RMDTOperationState.IN_PROFESS, null, null);
+ dtSecretManagerAsync.operations.put(key, value);
+ }
+ Assert.assertEquals(RMDTOperationType.values().length,
+ dtSecretManagerAsync.operations.size());
+ for (RMDTOperationValue value : dtSecretManagerAsync.operations.values()) {
+ value.state = RMDTOperationState.FINISHED;
+ value.timestamp = System.currentTimeMillis();
+ }
+
+ // wait for cleanup
+ Thread.sleep(2000);
+
+ Assert.assertEquals(0, dtSecretManagerAsync.operations.size());
+
+ rm1.stop();
+ rm1.close();
+ }
+
class MyMockRM extends TestSecurityMockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
@@ -172,8 +267,8 @@ protected RMSecretManagerService createRMSecretManagerService() {
@Override
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager(Configuration conf,
- RMContext rmContext) {
+ createRMDelegationTokenSecretManager(Configuration conf,
+ RMContext rmContext) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
@@ -186,14 +281,15 @@ protected RMSecretManagerService createRMSecretManagerService() {
public class TestRMDelegationTokenSecretManager extends
RMDelegationTokenSecretManager {
+
public AtomicInteger numUpdatedKeys = new AtomicInteger(0);
public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
- rmContext);
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
+ rmContext);
}
@Override