diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index d892c5d..8899e8c 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -507,9 +507,7 @@ public void stopThreads() {
running = false;
if (tokenRemoverThread != null) {
- synchronized (noInterruptsLock) {
- tokenRemoverThread.interrupt();
- }
+ tokenRemoverThread.interrupt();
try {
tokenRemoverThread.join();
} catch (InterruptedException e) {
diff --git hadoop-common-project/hadoop-common/src/main/proto/Security.proto hadoop-common-project/hadoop-common/src/main/proto/Security.proto
index 5ff571d..e467383 100644
--- hadoop-common-project/hadoop-common/src/main/proto/Security.proto
+++ hadoop-common-project/hadoop-common/src/main/proto/Security.proto
@@ -43,7 +43,8 @@ message GetDelegationTokenRequestProto {
}
message GetDelegationTokenResponseProto {
- optional hadoop.common.TokenProto token = 1;
+ optional bool is_obtained = 1;
+ optional hadoop.common.TokenProto token = 2;
}
message RenewDelegationTokenRequestProto {
@@ -51,13 +52,15 @@ message RenewDelegationTokenRequestProto {
}
message RenewDelegationTokenResponseProto {
- required uint64 newExpiryTime = 1;
+ optional bool is_renewed = 1;
+ optional uint64 newExpiryTime = 2;
}
message CancelDelegationTokenRequestProto {
required hadoop.common.TokenProto token = 1;
}
-message CancelDelegationTokenResponseProto { // void response
+message CancelDelegationTokenResponseProto {
+ optional bool is_canceled = 1;
}
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
index aeb20cd..eec4537 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
@@ -104,6 +104,7 @@ public void testClusterGetDelegationToken() throws Exception {
rmDTToken.setKind("Testclusterkind");
rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
rmDTToken.setService("0.0.0.0:8032");
+ getDTResponse.setIsObtained(true);
getDTResponse.setRMDelegationToken(rmDTToken);
final ApplicationClientProtocol cRMProtocol = mock(ApplicationClientProtocol.class);
when(cRMProtocol.getDelegationToken(any(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 864980b..a4d5f01 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -296,13 +296,22 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
/**
*
The interface used by clients to get delegation token, enabling the
- * containers to be able to talk to the service using those tokens.
- *
- *
The ResourceManager responds with the delegation
- * {@link Token} that can be used by the client to speak to this
- * service.
+ * containers to be able to talk to the service using those tokens.
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of getting the delegation
+ * token is completed or not.
+ * - The {@link Token} that can be used by the client to speak to this
+ * service.
+ *
+ * Note: users have to wait until this flag becomes true to get the
+ * {@link Token}.
+ *
* @param request request to get a delegation token for the client.
- * @return delegation token that can be used to talk to this service
+ * @return the response including a flag and a delegation token that can be
+ * used to talk to this service
* @throws YarnException
* @throws IOException
*/
@@ -314,9 +323,20 @@ public GetDelegationTokenResponse getDelegationToken(
/**
* Renew an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of renewing the delegation
+ * token is completed or not.
+ * - The new expiry time for the delegation token.
+ *
+ * Note: users have to wait until this flag becomes true to get new expiry
+ * time.
+ *
* @param request the delegation token to be renewed.
- * @return the new expiry time for the delegation token.
+ * @return the response including a flag and a new expiry time for the
+ * delegation token.
* @throws YarnException
* @throws IOException
*/
@@ -328,9 +348,18 @@ public RenewDelegationTokenResponse renewDelegationToken(
/**
* Cancel an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of canceling the delegation
+ * token is completed or not.
+ *
+ * Note: users have to wait until this flag becomes true to confirm
+ * the delegation token is canceled.
+ *
* @param request the delegation token to be cancelled.
- * @return an empty response.
+ * @return the response including a flag.
* @throws YarnException
* @throws IOException
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
index 495d03e..d605c60 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,9 +33,27 @@
public abstract class CancelDelegationTokenResponse {
@Private
@Unstable
- public static CancelDelegationTokenResponse newInstance() {
+ public static CancelDelegationTokenResponse newInstance(boolean isCanceled) {
CancelDelegationTokenResponse response =
Records.newRecord(CancelDelegationTokenResponse.class);
+ response.setIsCanceled(isCanceled);
return response;
}
+
+ /**
+ * Get the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsCanceled();
+
+ /**
+ * Set the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsCanceled(boolean isCanceled);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
index bb7fcf0..9613619 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
@@ -39,14 +39,32 @@
@Private
@Unstable
- public static GetDelegationTokenResponse newInstance(Token rmDTToken) {
+ public static GetDelegationTokenResponse newInstance(
+ boolean isObtained, Token rmDTToken) {
GetDelegationTokenResponse response =
Records.newRecord(GetDelegationTokenResponse.class);
+ response.setIsObtained(isObtained);
response.setRMDelegationToken(rmDTToken);
return response;
}
/**
+ * Get the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsObtained();
+
+ /**
+ * Set the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsObtained(boolean isObtained);
+
+ /**
* The Delegation tokens have a identifier which maps to
* {@link AbstractDelegationTokenIdentifier}.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
index f0f7fad..5c449f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,18 +33,43 @@
@Private
@Unstable
- public static RenewDelegationTokenResponse newInstance(long expTime) {
+ public static RenewDelegationTokenResponse newInstance(
+ boolean isRenewed, long expTime) {
RenewDelegationTokenResponse response =
Records.newRecord(RenewDelegationTokenResponse.class);
+ response.setIsRenewed(isRenewed);
response.setNextExpirationTime(expTime);
return response;
}
+ /**
+ * Get the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsRenewed();
+
+ /**
+ * Set the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsRenewed(boolean isRenewed);
+
+ /**
+ * Get the new expiration time.
+ */
@Private
@Unstable
public abstract long getNextExpirationTime();
+ /**
+ * Set the new expiration time
+ */
@Private
@Unstable
public abstract void setNextExpirationTime(long expTime);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 839765c..a6f8f61 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -507,12 +506,24 @@
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
-
+
/** Delegation Token renewer thread count */
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
+ /** Interval at which the thread of cleanup delegation token operations runs */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-interval-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ 60000l;
+
+ /** The max life time of a delegation token operation after its finish before its cleanup */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-delay-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ 30000l;
+
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index d35e1a4..74e876a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -268,8 +268,25 @@ public Token getRMDelegationToken(Text renewer)
GetDelegationTokenRequest rmDTRequest =
Records.newRecord(GetDelegationTokenRequest.class);
rmDTRequest.setRenewer(renewer.toString());
- GetDelegationTokenResponse response =
- rmClient.getDelegationToken(rmDTRequest);
+
+ GetDelegationTokenResponse response;
+ int pollCount = 0;
+ while (true) {
+ response = rmClient.getDelegationToken(rmDTRequest);
+ if (response.getIsObtained()) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Get the RM delegation token is not finished");
+ }
+ try {
+ //TODO: need to replace statePollIntervalMillis after YARN-1446
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ }
+ }
return response.getRMDelegationToken();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 826433d..525d5c4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -36,12 +36,15 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,18 +59,17 @@
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
+
public class TestYarnClient {
@Test
@@ -90,7 +92,7 @@ public void testClientStop() {
}
@Test (timeout = 30000)
- public void testSubmitApplication() {
+ public void testSubmitApplication() throws Exception {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
100); // speed up tests
@@ -128,6 +130,21 @@ public void testSubmitApplication() {
client.stop();
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testGetDelegationToken() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
+ 100); // speed up tests
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ client.getRMDelegationToken(new Text("test renewer"));
+ verify(((MockYarnClient) client).rmClient, times(5)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+ }
+
@Test(timeout = 30000)
public void testApplicationType() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -240,6 +257,18 @@ public void start() {
Assert.fail("Exception is not expected.");
}
when(mockResponse.getApplicationReport()).thenReturn(mockReport);
+
+ try {
+ when(rmClient.getDelegationToken(
+ any(GetDelegationTokenRequest.class))).thenReturn(
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(false, null),
+ GetDelegationTokenResponse.newInstance(true, null));
+ } catch (Exception e) {
+ Assert.fail(); // shouldn't happen with Mockito
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
index ec2b2b2..791cc66 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
@@ -20,16 +20,20 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
-public class CancelDelegationTokenResponsePBImpl extends CancelDelegationTokenResponse {
+public class CancelDelegationTokenResponsePBImpl
+ extends CancelDelegationTokenResponse {
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
.getDefaultInstance();
+ CancelDelegationTokenResponseProto.Builder builder = null;
+ boolean viaProto = false;
public CancelDelegationTokenResponsePBImpl() {
}
@@ -39,6 +43,19 @@ public CancelDelegationTokenResponsePBImpl(
this.proto = proto;
}
+ @Override
+ public boolean getIsCanceled() {
+ CancelDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsCanceled();
+ }
+
+ @Override
+ public void setIsCanceled(boolean isCanceled) {
+ maybeInitBuilder();
+ builder.setIsCanceled(isCanceled);
+ }
+
public CancelDelegationTokenResponseProto getProto() {
return proto;
}
@@ -62,4 +79,12 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = CancelDelegationTokenResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
index 93f4b5b..9502b78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
@@ -72,6 +72,19 @@ public void setRMDelegationToken(Token appToken) {
this.appToken = appToken;
}
+ @Override
+ public boolean getIsObtained() {
+ GetDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsObtained();
+ }
+
+ @Override
+ public void setIsObtained(boolean isObtained) {
+ maybeInitBuilder();
+ builder.setIsObtained(isObtained);
+ }
+
public GetDelegationTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -128,4 +141,5 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
index 9d20b46..2007f5a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
@@ -89,4 +89,18 @@ public void setNextExpirationTime(long expTime) {
maybeInitBuilder();
builder.setNewExpiryTime(expTime);
}
+
+ @Override
+ public boolean getIsRenewed() {
+ RenewDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsRenewed();
+ }
+
+ @Override
+ public void setIsRenewed(boolean isRenewed) {
+ maybeInitBuilder();
+ builder.setIsRenewed(isRenewed);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c43dc1a..77e62e0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -437,6 +437,20 @@
+ Interval at which the thread of cleanup delegation token
+ operations runs
+ yarn.resourcemanager.delegation-token-operation.cleanup-interval-ms
+ 60000
+
+
+
+ The max life time of a delegation token operation after its
+ finish before its cleanup
+ yarn.resourcemanager.delegation-token-operation.cleanup-delay-ms
+ 30000
+
+
+
Interval for the roll over for the master key used to generate
application tokens
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index f0e8553..c297850 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -100,7 +100,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
/**
@@ -610,8 +609,6 @@ public GetDelegationTokenResponse getDelegationToken(
"Delegation Token can be issued only with kerberos authentication");
}
- GetDelegationTokenResponse response =
- recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
@@ -621,17 +618,25 @@ public GetDelegationTokenResponse getDelegationToken(
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
- Token realRMDTtoken =
- new Token(tokenIdentifier,
- this.rmDTSecretManager);
- response.setRMDelegationToken(
- BuilderUtils.newDelegationToken(
+ switch (rmDTSecretManager.getGetDelegationTokenState(tokenIdentifier)) {
+ case NEW:
+ rmDTSecretManager.recordToken(tokenIdentifier,
+ new Token(
+ tokenIdentifier, rmDTSecretManager));
+ return GetDelegationTokenResponse.newInstance(false, null);
+ case PROGRESSING:
+ return GetDelegationTokenResponse.newInstance(false, null);
+ case FINISHED:
+ default:
+ Token realRMDTtoken =
+ rmDTSecretManager.pollGetDelegationTokenResponse(tokenIdentifier);
+ return GetDelegationTokenResponse.newInstance(true,
+ BuilderUtils.newDelegationToken(
realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(),
- realRMDTtoken.getService().toString()
- ));
- return response;
+ realRMDTtoken.getService().toString()));
+ }
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
}
@@ -651,12 +656,20 @@ public RenewDelegationTokenResponse renewDelegationToken(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
- String user = getRenewerForToken(token);
- long nextExpTime = rmDTSecretManager.renewToken(token, user);
- RenewDelegationTokenResponse renewResponse = Records
- .newRecord(RenewDelegationTokenResponse.class);
- renewResponse.setNextExpirationTime(nextExpTime);
- return renewResponse;
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ switch (rmDTSecretManager.getRenewDelegationTokenState(rmDTIdentifier)) {
+ case NEW:
+ String user = getRenewerForToken(token);
+ rmDTSecretManager.renewToken(token, user);
+ return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE);
+ case PROGRESSING:
+ return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE);
+ case FINISHED:
+ default:
+ long expTime =
+ rmDTSecretManager.pollRenewDelegationTokenResponse(rmDTIdentifier);
+ return RenewDelegationTokenResponse.newInstance(true, expTime);
+ }
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
@@ -675,9 +688,19 @@ public CancelDelegationTokenResponse cancelDelegationToken(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
- String user = getRenewerForToken(token);
- rmDTSecretManager.cancelToken(token, user);
- return Records.newRecord(CancelDelegationTokenResponse.class);
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ switch (rmDTSecretManager.getCancelDelegationTokenState(rmDTIdentifier)) {
+ case NEW:
+ String user = getRenewerForToken(token);
+ rmDTSecretManager.cancelToken(token, user);
+ return CancelDelegationTokenResponse.newInstance(false);
+ case PROGRESSING:
+ return CancelDelegationTokenResponse.newInstance(false);
+ case FINISHED:
+ default:
+ rmDTSecretManager.pollCancelDelegationTokenResponse(rmDTIdentifier);
+ return CancelDelegationTokenResponse.newInstance(true);
+ }
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index e442077..6b13649 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -86,6 +86,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -435,8 +436,9 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
- rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
+ rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext, conf);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+ rmDispatcher.register(RMDelegationTokenEventType.class, rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
@@ -944,7 +946,8 @@ protected ResourceTrackerService createResourceTrackerService() {
}
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager(RMContext rmContext) {
+ createRMDelegationTokenSecretManager(RMContext rmContext,
+ Configuration conf) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -956,7 +959,7 @@ protected ResourceTrackerService createResourceTrackerService() {
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
+ tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext, conf);
}
protected ClientRMService createClientRMService() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index a845264..e654e5f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -59,6 +59,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
@Private
@Unstable
@@ -431,18 +433,17 @@ protected abstract void updateApplicationAttemptStateInternal(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/**
- * RMDTSecretManager call this to store the state of a delegation token
+ * Non-blocking
+ * API RMDTSecretManager call this to store the state of a delegation token
* and sequence number
*/
+ @SuppressWarnings("unchecked")
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) {
- try {
- storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
- latestSequenceNumber);
- } catch (Exception e) {
- notifyStoreOperationFailed(e);
- }
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRMDelegationTokenAndSequenceNumberEvent(
+ rmDTIdentifier, renewDate, latestSequenceNumber));
}
/**
@@ -455,15 +456,29 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState(
int latestSequenceNumber) throws Exception;
/**
- * RMDTSecretManager call this to remove the state of a delegation token
+ * Non-blocking
+ * API RMDTSecretManager call this to update the state of a delegation token
+ * and sequence number
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void updateRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) {
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent(
+ rmDTIdentifier, renewDate, latestSequenceNumber));
+ }
+
+ /**
+ * Non-blocking
+ * API RMDTSecretManager call this to remove the state of a delegation token
*/
+ @SuppressWarnings("unchecked")
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
- try {
- removeRMDelegationTokenState(rmDTIdentifier);
- } catch (Exception e) {
- notifyStoreOperationFailed(e);
- }
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRemoveRMDelegationTokenEvent(
+ rmDTIdentifier, sequenceNumber));
}
/**
@@ -665,6 +680,57 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
LOG.error("Error removing app: " + appId, e);
notifyStoreOperationFailed(e);
}
+ } else if (event.getType().equals(RMStateStoreEventType.STORE_RM_DT_SN)) {
+ RMStateStoreRMDelegationTokenAndSequenceNumberEvent rmDTAndSNEvent =
+ (RMStateStoreRMDelegationTokenAndSequenceNumberEvent) event;
+ Exception storedException = null;
+ LOG.info("Storing info for delegation token: " +
+ rmDTAndSNEvent.getRmDTIdentifier());
+ try {
+ storeRMDelegationTokenAndSequenceNumberState(
+ rmDTAndSNEvent.getRmDTIdentifier(), rmDTAndSNEvent.getRenewDate(),
+ rmDTAndSNEvent.getSequenceNumber());
+ notifyDoneStoringRMDelegationTokenAndSequenceNumber(
+ rmDTAndSNEvent.getRmDTIdentifier(), storedException);
+ } catch (Exception e) {
+ LOG.error("Error storing delegation token: " +
+ rmDTAndSNEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.REMOVE_RM_DT_SN)) {
+ RMStateStoreRemoveRMDelegationTokenEvent removeRmDTEvent =
+ (RMStateStoreRemoveRMDelegationTokenEvent) event;
+ Exception removedException = null;
+ LOG.info("Removing info for delegation token: " +
+ removeRmDTEvent.getRmDTIdentifier());
+ try {
+ removeRMDelegationTokenState(removeRmDTEvent.getRmDTIdentifier());
+ notifyDoneRemovingRMDelegationToken(
+ removeRmDTEvent.getRmDTIdentifier(), removedException);
+ } catch (Exception e) {
+ LOG.error("Error removing delegation token: " +
+ removeRmDTEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.UPDATE_RM_DT_SN)) {
+ RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent updateRmDTAndSNEvent =
+ (RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent) event;
+ Exception updatedException = null;
+ LOG.info("Updating info for delegation token: " +
+ updateRmDTAndSNEvent.getRmDTIdentifier());
+ try {
+ removeRMDelegationTokenState(updateRmDTAndSNEvent.getRmDTIdentifier());
+ storeRMDelegationTokenAndSequenceNumberState(
+ updateRmDTAndSNEvent.getRmDTIdentifier(),
+ updateRmDTAndSNEvent.getRenewDate(),
+ updateRmDTAndSNEvent.getSequenceNumber());
+ notifyDoneUpdatingRMDelegationTokenAndSequenceNumber(
+ updateRmDTAndSNEvent.getRmDTIdentifier(), updatedException);
+ } catch (Exception e) {
+ LOG.error("Error updating delegation token: " +
+ updateRmDTAndSNEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
@@ -729,6 +795,60 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
}
/**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that new delegation token is stored in state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been saved
+ * @param storedException the exception that is thrown when storing the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneStoringRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception storedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_STORED,
+ rmDTIdentifier, storedException));
+ }
+
+ /**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that new delegation token is removed from state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been removed
+ * @param storedException the exception that is thrown when removing the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneRemovingRMDelegationToken(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception storedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_REMOVED,
+ rmDTIdentifier, storedException));
+ }
+
+ /**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that the delegation token is updated in state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been updated
+ * @param storedException the exception that is thrown when updating the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneUpdatingRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception updatedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_UPDATED,
+ rmDTIdentifier, updatedException));
+ }
+
+ /**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 903f4e7..0339fa9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -23,5 +23,8 @@
STORE_APP,
UPDATE_APP,
UPDATE_APP_ATTEMPT,
- REMOVE_APP
+ REMOVE_APP,
+ STORE_RM_DT_SN,
+ REMOVE_RM_DT_SN,
+ UPDATE_RM_DT_SN
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java
new file mode 100644
index 0000000..7dc266f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreRMDelegationTokenAndSequenceNumberEvent
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final Long renewDate;
+ private final int sequenceNumber;
+
+ public RMStateStoreRMDelegationTokenAndSequenceNumberEvent(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Long renewDate, int sequenceNumber) {
+ super(RMStateStoreEventType.STORE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.renewDate = renewDate;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Long getRenewDate() {
+ return renewDate;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java
new file mode 100644
index 0000000..9d54373
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationTokenEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreRemoveRMDelegationTokenEvent
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final int sequenceNumber;
+
+ public RMStateStoreRemoveRMDelegationTokenEvent(
+ RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+ super(RMStateStoreEventType.REMOVE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java
new file mode 100644
index 0000000..47b713c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final Long renewDate;
+ private final int sequenceNumber;
+
+ public RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Long renewDate, int sequenceNumber) {
+ super(RMStateStoreEventType.UPDATE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.renewDate = renewDate;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Long getRenewDate() {
+ return renewDate;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
new file mode 100644
index 0000000..b4ad11d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+
+public class RMDelegationTokenEvent extends
+ AbstractEvent {
+
+ private RMDelegationTokenIdentifier rmDTIdentifier;
+ private Exception e;
+
+ public RMDelegationTokenEvent(RMDelegationTokenEventType type,
+ RMDelegationTokenIdentifier rmDTIdentifier, Exception e) {
+ super(type);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.e = e;
+ }
+
+ public RMDelegationTokenIdentifier getRMDelegationTokenIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Exception getException() {
+ return e;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
new file mode 100644
index 0000000..9ac2c2c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+
+public enum RMDelegationTokenEventType {
+ //Source: RMStateStore
+ DT_STORED,
+ DT_REMOVED,
+ DT_UPDATED
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
index 23939de..ee14833 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
@@ -23,15 +23,22 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -48,11 +55,16 @@
@InterfaceStability.Unstable
public class RMDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager implements
- Recoverable {
+ Recoverable, EventHandler{
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSecretManager.class);
protected final RMContext rmContext;
+ protected final ConcurrentMap dtOperations =
+ new ConcurrentHashMap();
+ protected final long dtOperationsCleanupInterval;
+ protected final long dtOperationsCleanupDelay;
+ protected final Thread dtOperationsCleaner;
/**
* Create a secret manager
@@ -63,15 +75,48 @@
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
+ * @param rmContext
+ * @param conf
*/
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval,
- RMContext rmContext) {
+ RMContext rmContext,
+ Configuration conf) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.rmContext = rmContext;
+ dtOperationsCleanupInterval = conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS);
+ dtOperationsCleanupDelay = conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS);
+ dtOperationsCleaner = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(dtOperationsCleanupInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "The thread of cleanup finished delegation token operations is interrupted");
+ return;
+ }
+ for (DelegationTokenOperationKey key : dtOperations.keySet()) {
+ DelegationTokenOperationValue value = dtOperations.get(key);
+ if (value != null &&
+ value.getState() == DelegationTokenOperationState.FINISHED) {
+ long delay = System.currentTimeMillis() - value.getFinishTime();
+ if (delay >= dtOperationsCleanupDelay) {
+ dtOperations.remove(key);
+ }
+ }
+ }
+ }
+ }
+ });
}
@Override
@@ -122,9 +167,7 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id,
try {
LOG.info("updating RMDelegation token with sequence number: "
+ id.getSequenceNumber());
- rmContext.getStateStore().removeRMDelegationToken(id,
- delegationTokenSequenceNumber);
- rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
+ rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
renewDate, id.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
@@ -195,4 +238,268 @@ public void recover(RMState rmState) throws Exception {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
+
+ public DelegationTokenOperationState getGetDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return getDelegationTokenOperationState(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ }
+
+ public DelegationTokenOperationState getCancelDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return getDelegationTokenOperationState(
+ rmDTIdentifier, DelegationTokenOperationType.CANCEL);
+ }
+
+ public DelegationTokenOperationState getRenewDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return getDelegationTokenOperationState(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ }
+
+ protected DelegationTokenOperationState getDelegationTokenOperationState(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.putIfAbsent(
+ key, new DelegationTokenOperationValue());
+ if (value == null) {
+ return DelegationTokenOperationState.NEW;
+ } else {
+ return value.getState();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Token pollGetDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ Object obj = pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ assert obj instanceof Token>;
+ return (Token) obj;
+ }
+
+ public void pollCancelDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.CANCEL);
+ }
+
+ public long pollRenewDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ Object obj = pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ assert obj instanceof Long;
+ return (Long) obj;
+ }
+
+ protected Object pollDelegationTokenOperationResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.remove(key);
+ if (value == null) {
+ return null;
+ } else {
+ return value.getResponse();
+ }
+ }
+
+ @Override
+ public synchronized long renewToken(Token token,
+ String renewer) throws InvalidToken, IOException {
+ long renewTime = super.renewToken(token, renewer);
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ DelegationTokenOperationValue value = getDelegationTokenOperationValue(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ if (value != null) {
+ value.setResponse(renewTime);
+ }
+ return renewTime;
+ }
+
+ public synchronized void recordToken(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Token token) {
+ DelegationTokenOperationValue value = getDelegationTokenOperationValue(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ if (value != null) {
+ value.setResponse(token);
+ }
+ }
+
+ @Override
+ public void handle(RMDelegationTokenEvent event) {
+ DelegationTokenOperationValue value;
+ switch (event.getType()) {
+ case DT_STORED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.GET);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ LOG.info("zshen test stored");
+ break;
+ case DT_REMOVED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.CANCEL);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ break;
+ case DT_UPDATED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.RENEW);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ break;
+ default:
+ LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
+ }
+ }
+
+ @Override
+ public void startThreads() throws IOException {
+ super.startThreads();
+ dtOperationsCleaner.start();
+ }
+
+ @Override
+ public void stopThreads() {
+ super.stopThreads();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopping delegation token operation cleaner thread");
+ running = false;
+
+ if (dtOperationsCleaner != null) {
+ synchronized (noInterruptsLock) {
+ dtOperationsCleaner.interrupt();
+ }
+ try {
+ dtOperationsCleaner.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Unable to join on delegation token operation cleaner thread", e);
+ }
+ }
+ }
+
+ protected DelegationTokenOperationValue getDelegationTokenOperationValue(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.get(key);
+ if (value == null) {
+ LOG.error("The delegation token operation " + type +
+ " of the identifier " + rmDTIdentifier + " is missing");
+ }
+ return value;
+ }
+
+ protected static enum DelegationTokenOperationType {
+ GET,
+ RENEW,
+ CANCEL
+ }
+
+ public static enum DelegationTokenOperationState {
+ NEW,
+ PROGRESSING,
+ FINISHED
+ }
+
+ protected static class DelegationTokenOperationKey {
+ private RMDelegationTokenIdentifier rmDTIdentifier;
+ private DelegationTokenOperationType type;
+
+ public DelegationTokenOperationKey(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.type = type;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // generated by eclipse
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DelegationTokenOperationKey other = (DelegationTokenOperationKey) obj;
+ if (rmDTIdentifier == null) {
+ if (other.rmDTIdentifier != null)
+ return false;
+ } else if (!(
+ // use ower/real user/renewer only to identify a DT operation
+ rmDTIdentifier.getOwner().equals(other.rmDTIdentifier.getOwner()) &&
+ rmDTIdentifier.getRealUser().equals(other.rmDTIdentifier.getRealUser()) &&
+ rmDTIdentifier.getRenewer().equals(other.rmDTIdentifier.getRenewer())))
+ return false;
+ if (type != other.type)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ // generated by eclipse
+ final int prime = 31;
+ int result = 1;
+ // use ower/real user/renewer only to identify a DT operation
+ String rmDTIdentifierStr = rmDTIdentifier.getOwner().toString() +
+ rmDTIdentifier.getRealUser().toString() +
+ rmDTIdentifier.getRenewer().toString();
+ result =
+ prime * result
+ + ((rmDTIdentifierStr == null) ? 0 : rmDTIdentifierStr.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ }
+
+ protected static class DelegationTokenOperationValue {
+ private Object response;
+ private DelegationTokenOperationState state;
+ private long finishTime;
+
+ public DelegationTokenOperationValue() {
+ state = DelegationTokenOperationState.PROGRESSING;
+ finishTime = Long.MAX_VALUE;
+ }
+
+ public Object getResponse() {
+ return response;
+ }
+
+ public void setResponse(Object response) {
+ this.response = response;
+ }
+
+ public DelegationTokenOperationState getState() {
+ return state;
+ }
+
+ public void setState(DelegationTokenOperationState state) {
+ this.state = state;
+ if (state == DelegationTokenOperationState.FINISHED) {
+ finishTime = System.currentTimeMillis();
+ }
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index ca6dc3e..deb8d30 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -19,21 +19,21 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,14 +51,19 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -72,6 +77,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,7 +87,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -89,6 +95,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -114,9 +121,20 @@
@BeforeClass
public static void setupSecretManager() throws IOException {
+ Configuration conf = new YarnConfiguration();
RMContext rmContext = mock(RMContext.class);
- when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
- dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
+ RMStateStore store = new MemoryRMStateStore();
+ store.init(conf);
+ store.start();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ store.setRMDispatcher(dispatcher);
+ when(rmContext.getStateStore()).thenReturn(store);
+
+ dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext,
+ new YarnConfiguration());
+ dispatcher.register(RMDelegationTokenEventType.class, dtsm);
dtsm.startThreads();
}
@@ -325,6 +343,76 @@ private void checkTokenRenewal(UserGroupInformation owner,
rmService.renewDelegationToken(request);
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testGetRnewCancelDelegationToken() throws Exception {
+ RMContext rmContext = mock(RMContext.class);
+ final ClientRMService rmService = new ClientRMService(
+ rmContext, null, null, null, null, dtsm);
+
+ int count = 0;
+ int maxCount = 10;
+ GetDelegationTokenResponse getResp = null;
+ for (; count < maxCount; ++count) {
+ getResp = owner.doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public GetDelegationTokenResponse run() throws Exception {
+ GetDelegationTokenRequest request =
+ GetDelegationTokenRequest.newInstance(owner.getUserName());
+ return rmService.getDelegationToken(request);
+ }
+ });
+ if (getResp.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
+
+ final org.apache.hadoop.yarn.api.records.Token token =
+ getResp.getRMDelegationToken();
+ for (count = 0; count < maxCount; ++count) {
+ RenewDelegationTokenResponse renewResp =
+ owner.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RenewDelegationTokenResponse run() throws Exception {
+ RenewDelegationTokenRequest request =
+ RenewDelegationTokenRequest.newInstance(token);
+ return rmService.renewDelegationToken(request);
+ }
+ });
+ if (renewResp.getIsRenewed()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
+
+ for (count = 0; count < maxCount; ++count) {
+ CancelDelegationTokenResponse cancelResp =
+ owner.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public CancelDelegationTokenResponse run() throws Exception {
+ CancelDelegationTokenRequest request =
+ CancelDelegationTokenRequest.newInstance(token);
+ return rmService.cancelDelegationToken(request);
+ }
+ });
+ if (cancelResp.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertTrue(count > 0);
+ Assert.assertTrue(count < 10);
+ }
+
@Test (timeout = 30000)
@SuppressWarnings ("rawtypes")
public void testAppSubmit() throws Exception {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
index d389c0e..c80c688 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
@@ -55,15 +55,19 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -82,7 +86,7 @@ public void resetSecretManager() {
RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
}
- @Test
+ @Test (timeout = 30000)
public void testDelegationToken() throws IOException, InterruptedException {
final YarnConfiguration conf = new YarnConfiguration();
@@ -362,8 +366,19 @@ public Server getServer(Class protocol, Object instance,
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
- return clientRMService.getDelegationToken(request)
- .getRMDelegationToken();
+ do {
+ GetDelegationTokenResponse response =
+ clientRMService.getDelegationToken(request);
+ if (response.getIsObtained()) {
+ return response.getRMDelegationToken();
+ } else {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+ } while (true);
}
});
return token;
@@ -465,11 +480,20 @@ private static ResourceScheduler createMockScheduler(Configuration conf) {
createRMDelegationTokenSecretManager(long secretKeyInterval,
long tokenMaxLifetime, long tokenRenewInterval) {
RMContext rmContext = mock(RMContext.class);
- when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+ Configuration conf = new YarnConfiguration();
+ RMStateStore store = new MemoryRMStateStore();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ store.init(conf);
+ store.start();
+ when(rmContext.getStateStore()).thenReturn(store);
RMDelegationTokenSecretManager rmDtSecretManager =
new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
- tokenRenewInterval, 3600000, rmContext);
+ tokenRenewInterval, 3600000, rmContext, conf);
+ dispatcher.register(RMDelegationTokenEventType.class, rmDtSecretManager);
+ store.setRMDispatcher(dispatcher);
return rmDtSecretManager;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index d396262..4312e70 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -47,6 +47,8 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -54,6 +56,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,8 +80,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -1182,7 +1186,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
rm2.stop();
}
- @Test
+ @Test (timeout = 30000)
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(
@@ -1213,8 +1217,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
GetDelegationTokenRequest.newInstance("renewer1");
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
AuthMethod.KERBEROS);
- GetDelegationTokenResponse response1 =
- rm1.getClientRMService().getDelegationToken(request1);
+ GetDelegationTokenResponse response1;
+ do {
+ response1 = rm1.getClientRMService().getDelegationToken(request1);
+ if (response1.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
response1.getRMDelegationToken();
Token token1 =
@@ -1252,8 +1263,15 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// request one more token
GetDelegationTokenRequest request2 =
GetDelegationTokenRequest.newInstance("renewer2");
- GetDelegationTokenResponse response2 =
- rm1.getClientRMService().getDelegationToken(request2);
+ GetDelegationTokenResponse response2;
+ do {
+ response2 = rm1.getClientRMService().getDelegationToken(request2);
+ if (response2.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
org.apache.hadoop.yarn.api.records.Token delegationToken2 =
response2.getRMDelegationToken();
Token token2 =
@@ -1261,12 +1279,17 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
// cancel token2
- try{
- rm1.getRMDTSecretManager().cancelToken(token2,
- UserGroupInformation.getCurrentUser().getUserName());
- } catch(Exception e) {
- Assert.fail();
- }
+ CancelDelegationTokenRequest request3 =
+ CancelDelegationTokenRequest.newInstance(delegationToken2);
+ do {
+ CancelDelegationTokenResponse response3 =
+ rm1.getClientRMService().cancelDelegationToken(request3);
+ if (response3.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
// Assert the token which has the latest delegationTokenSequenceNumber is removed
Assert.assertEquals(
@@ -1293,14 +1316,20 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
- try{
- // Sleep for one millisecond to make sure renewDataAfterRenew is greater
- Thread.sleep(1);
- // renew recovered token
- rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
- } catch(Exception e) {
- Assert.fail();
- }
+ // Sleep for one millisecond to make sure renewDataAfterRenew is greater
+ Thread.sleep(1);
+ // renew recovered token
+ RenewDelegationTokenRequest request4 =
+ RenewDelegationTokenRequest.newInstance(delegationToken1);
+ do {
+ RenewDelegationTokenResponse response4 =
+ rm2.getClientRMService().renewDelegationToken(request4);
+ if (response4.getIsRenewed()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
Long renewDateAfterRenew = allTokensRM2.get(dtId1);
@@ -1312,12 +1341,17 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// assert old token is removed from state store
Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
- try{
- rm2.getRMDTSecretManager().cancelToken(token1,
- UserGroupInformation.getCurrentUser().getUserName());
- } catch(Exception e) {
- Assert.fail();
- }
+ CancelDelegationTokenRequest request5 =
+ CancelDelegationTokenRequest.newInstance(delegationToken1);
+ do {
+ CancelDelegationTokenResponse response5 =
+ rm2.getClientRMService().cancelDelegationToken(request5);
+ if (response5.getIsCanceled()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
// assert token is removed from state after its cancelled
allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
@@ -1331,7 +1365,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// This is to test submit an application to the new RM with the old delegation
// token got from previous RM.
- @Test
+ @Test (timeout = 30000)
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -1349,8 +1383,15 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
GetDelegationTokenRequest.newInstance("renewer1");
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
AuthMethod.KERBEROS);
- GetDelegationTokenResponse response1 =
- rm1.getClientRMService().getDelegationToken(request1);
+ GetDelegationTokenResponse response1;
+ do {
+ response1 = rm1.getClientRMService().getDelegationToken(request1);
+ if (response1.getIsObtained()) {
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while (true);
Token token1 =
ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index ff110b3..7ea16bb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -69,7 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
-public class RMStateStoreTestBase extends ClientBaseWithFixes{
+public class RMStateStoreTestBase extends ClientBaseWithFixes {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
@@ -343,7 +343,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
store.close();
}
- public void testRMDTSecretManagerStateStore(
+ void testRMDTSecretManagerStateStore(
RMStateStoreHelper stateStoreHelper) throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
@@ -367,12 +367,30 @@ public void testRMDTSecretManagerStateStore(
keySet.add(key);
store.storeRMDTMasterKey(key);
+ // let things settle down
+ Thread.sleep(1000);
+
RMDTSecretManagerState secretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, secretManagerState.getTokenState());
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
+
+ // check renew token
+ renewDate1 = new Long(System.currentTimeMillis());
+ sequenceNumber = 1112;
+ token1.put(dtId1, renewDate1);
+ store.updateRMDelegationTokenAndSequenceNumber(dtId1, renewDate1, sequenceNumber);
+
+ // let things settle down
+ Thread.sleep(1000);
+
+ secretManagerState = store.loadState().getRMDTSecretManagerState();
+ Assert.assertEquals(token1, secretManagerState.getTokenState());
+ Assert.assertEquals(sequenceNumber,
+ secretManagerState.getDTSequenceNumber());
+
store.close();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
index 6cc0a18..1488838 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager.DelegationTokenOperationType;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -96,10 +97,17 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
// request to generate a RMDelegationToken
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
- GetDelegationTokenResponse response =
- rm1.getClientRMService().getDelegationToken(request);
- org.apache.hadoop.yarn.api.records.Token delegationToken =
- response.getRMDelegationToken();
+ org.apache.hadoop.yarn.api.records.Token delegationToken;
+ do {
+ GetDelegationTokenResponse response =
+ rm1.getClientRMService().getDelegationToken(request);
+ if (response.getIsObtained()) {
+ delegationToken = response.getRMDelegationToken();
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ } while(true);
Token token1 =
ConverterUtils.convertFromYarn(delegationToken, null);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
@@ -159,6 +167,47 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
}
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testCleanupOutstandingFinishedDTOperations() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ conf.setLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS,
+ 1000);
+ conf.setLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS,
+ 500);
+ MockRM rm1 = new MyMockRM(conf, memStore);
+ rm1.start();
+ RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+
+ for (DelegationTokenOperationType type : DelegationTokenOperationType.values()) {
+ RMDelegationTokenIdentifier rmDTIdentifier = new RMDelegationTokenIdentifier();
+ RMDelegationTokenSecretManager.DelegationTokenOperationKey key =
+ new RMDelegationTokenSecretManager.DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ RMDelegationTokenSecretManager.DelegationTokenOperationValue value =
+ new RMDelegationTokenSecretManager.DelegationTokenOperationValue();
+ dtSecretManager.dtOperations.put(key, value);
+ }
+ Assert.assertEquals(DelegationTokenOperationType.values().length,
+ dtSecretManager.dtOperations.size());
+ for (RMDelegationTokenSecretManager.DelegationTokenOperationValue value :
+ dtSecretManager.dtOperations.values()) {
+ value.setState(
+ RMDelegationTokenSecretManager.DelegationTokenOperationState.FINISHED);
+ }
+
+ // wait for cleanup
+ Thread.sleep(2000);
+
+ Assert.assertEquals(0, dtSecretManager.dtOperations.size());
+
+ rm1.stop();
+ }
+
class MyMockRM extends TestSecurityMockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
@@ -167,11 +216,12 @@ public MyMockRM(Configuration conf, RMStateStore store) {
@Override
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager(RMContext rmContext) {
+ createRMDelegationTokenSecretManager(RMContext rmContext,
+ Configuration conf) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
- rmContext);
+ rmContext, conf);
}
}
@@ -181,10 +231,11 @@ public MyMockRM(Configuration conf, RMStateStore store) {
public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval, RMContext rmContext) {
+ long delegationTokenRemoverScanInterval, RMContext rmContext,
+ Configuration conf) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
- rmContext);
+ rmContext, conf);
}
@Override