diff --git hadoop-common-project/hadoop-common/src/main/proto/Security.proto hadoop-common-project/hadoop-common/src/main/proto/Security.proto
index 5ff571d..e467383 100644
--- hadoop-common-project/hadoop-common/src/main/proto/Security.proto
+++ hadoop-common-project/hadoop-common/src/main/proto/Security.proto
@@ -43,7 +43,8 @@ message GetDelegationTokenRequestProto {
}
message GetDelegationTokenResponseProto {
- optional hadoop.common.TokenProto token = 1;
+ optional bool is_obtained = 1;
+ optional hadoop.common.TokenProto token = 2;
}
message RenewDelegationTokenRequestProto {
@@ -51,13 +52,15 @@ message RenewDelegationTokenRequestProto {
}
message RenewDelegationTokenResponseProto {
- required uint64 newExpiryTime = 1;
+ optional bool is_renewed = 1;
+ optional uint64 newExpiryTime = 2;
}
message CancelDelegationTokenRequestProto {
required hadoop.common.TokenProto token = 1;
}
-message CancelDelegationTokenResponseProto { // void response
+message CancelDelegationTokenResponseProto {
+ optional bool is_canceled = 1;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 864980b..a4d5f01 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -296,13 +296,22 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
/**
*
The interface used by clients to get delegation token, enabling the
- * containers to be able to talk to the service using those tokens.
- *
- *
The ResourceManager responds with the delegation
- * {@link Token} that can be used by the client to speak to this
- * service.
+ * containers to be able to talk to the service using those tokens.
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of getting the delegation
+ * token is completed or not.
+ * - The {@link Token} that can be used by the client to speak to this
+ * service.
+ *
+ * Note: users have to wait until this flag becomes true to get the
+ * {@link Token}.
+ *
* @param request request to get a delegation token for the client.
- * @return delegation token that can be used to talk to this service
+ * @return the response including a flag and a delegation token that can be
+ * used to talk to this service
* @throws YarnException
* @throws IOException
*/
@@ -314,9 +323,20 @@ public GetDelegationTokenResponse getDelegationToken(
/**
* Renew an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of renewing the delegation
+ * token is completed or not.
+ * - The new expiry time for the delegation token.
+ *
+ * Note: users have to wait until this flag becomes true to get new expiry
+ * time.
+ *
* @param request the delegation token to be renewed.
- * @return the new expiry time for the delegation token.
+ * @return the response including a flag and a new expiry time for the
+ * delegation token.
* @throws YarnException
* @throws IOException
*/
@@ -328,9 +348,18 @@ public RenewDelegationTokenResponse renewDelegationToken(
/**
* Cancel an existing delegation {@link Token}.
- *
+ *
+ *
+ * The response includes:
+ *
+ * - A flag which indicates that the process of canceling the delegation
+ * token is completed or not.
+ *
+ * Note: users have to wait until this flag becomes true to confirm
+ * the delegation token is canceled.
+ *
* @param request the delegation token to be cancelled.
- * @return an empty response.
+ * @return the response including a flag.
* @throws YarnException
* @throws IOException
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
index 495d03e..d605c60 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,9 +33,27 @@
public abstract class CancelDelegationTokenResponse {
@Private
@Unstable
- public static CancelDelegationTokenResponse newInstance() {
+ public static CancelDelegationTokenResponse newInstance(boolean isCanceled) {
CancelDelegationTokenResponse response =
Records.newRecord(CancelDelegationTokenResponse.class);
+ response.setIsCanceled(isCanceled);
return response;
}
+
+ /**
+ * Get the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsCanceled();
+
+ /**
+ * Set the flag which indicates that the process of canceling delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsCanceled(boolean isCanceled);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
index bb7fcf0..9613619 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenResponse.java
@@ -39,14 +39,32 @@
@Private
@Unstable
- public static GetDelegationTokenResponse newInstance(Token rmDTToken) {
+ public static GetDelegationTokenResponse newInstance(
+ boolean isObtained, Token rmDTToken) {
GetDelegationTokenResponse response =
Records.newRecord(GetDelegationTokenResponse.class);
+ response.setIsObtained(isObtained);
response.setRMDelegationToken(rmDTToken);
return response;
}
/**
+ * Get the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsObtained();
+
+ /**
+ * Set the flag which indicates that the process of getting delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsObtained(boolean isObtained);
+
+ /**
* The Delegation tokens have a identifier which maps to
* {@link AbstractDelegationTokenIdentifier}.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
index f0f7fad..5c449f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -31,18 +33,43 @@
@Private
@Unstable
- public static RenewDelegationTokenResponse newInstance(long expTime) {
+ public static RenewDelegationTokenResponse newInstance(
+ boolean isRenewed, long expTime) {
RenewDelegationTokenResponse response =
Records.newRecord(RenewDelegationTokenResponse.class);
+ response.setIsRenewed(isRenewed);
response.setNextExpirationTime(expTime);
return response;
}
+ /**
+ * Get the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsRenewed();
+
+ /**
+ * Set the flag which indicates that the process of renewing delegation
+ * token is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsRenewed(boolean isRenewed);
+
+ /**
+ * Get the new expiration time.
+ */
@Private
@Unstable
public abstract long getNextExpirationTime();
+ /**
+ * Set the new expiration time
+ */
@Private
@Unstable
public abstract void setNextExpirationTime(long expTime);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 839765c..a6f8f61 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -507,12 +506,24 @@
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
-
+
/** Delegation Token renewer thread count */
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
+ /** Interval at which the thread of cleanup delegation token operations runs */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-interval-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS =
+ 60000l;
+
+ /** The max life time of a delegation token operation after its finish before its cleanup */
+ public static final String RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ RM_PREFIX + "delegation-token-operation.cleanup-delay-ms";
+ public static final long DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS =
+ 30000l;
+
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index d35e1a4..74e876a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -268,8 +268,25 @@ public Token getRMDelegationToken(Text renewer)
GetDelegationTokenRequest rmDTRequest =
Records.newRecord(GetDelegationTokenRequest.class);
rmDTRequest.setRenewer(renewer.toString());
- GetDelegationTokenResponse response =
- rmClient.getDelegationToken(rmDTRequest);
+
+ GetDelegationTokenResponse response;
+ int pollCount = 0;
+ while (true) {
+ response = rmClient.getDelegationToken(rmDTRequest);
+ if (response.getIsObtained()) {
+ break;
+ }
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Get the RM delegation token is not finished");
+ }
+ try {
+ //TODO: need to replace statePollIntervalMillis after YARN-1446
+ Thread.sleep(statePollIntervalMillis);
+ } catch (InterruptedException ie) {
+ }
+ }
return response.getRMDelegationToken();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
index ec2b2b2..791cc66 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java
@@ -20,16 +20,20 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
+import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
-public class CancelDelegationTokenResponsePBImpl extends CancelDelegationTokenResponse {
+public class CancelDelegationTokenResponsePBImpl
+ extends CancelDelegationTokenResponse {
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
.getDefaultInstance();
+ CancelDelegationTokenResponseProto.Builder builder = null;
+ boolean viaProto = false;
public CancelDelegationTokenResponsePBImpl() {
}
@@ -39,6 +43,19 @@ public CancelDelegationTokenResponsePBImpl(
this.proto = proto;
}
+ @Override
+ public boolean getIsCanceled() {
+ CancelDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsCanceled();
+ }
+
+ @Override
+ public void setIsCanceled(boolean isCanceled) {
+ maybeInitBuilder();
+ builder.setIsCanceled(isCanceled);
+ }
+
public CancelDelegationTokenResponseProto getProto() {
return proto;
}
@@ -62,4 +79,12 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = CancelDelegationTokenResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
index 93f4b5b..9502b78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenResponsePBImpl.java
@@ -72,6 +72,19 @@ public void setRMDelegationToken(Token appToken) {
this.appToken = appToken;
}
+ @Override
+ public boolean getIsObtained() {
+ GetDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsObtained();
+ }
+
+ @Override
+ public void setIsObtained(boolean isObtained) {
+ maybeInitBuilder();
+ builder.setIsObtained(isObtained);
+ }
+
public GetDelegationTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -128,4 +141,5 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
index 9d20b46..2007f5a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java
@@ -89,4 +89,18 @@ public void setNextExpirationTime(long expTime) {
maybeInitBuilder();
builder.setNewExpiryTime(expTime);
}
+
+ @Override
+ public boolean getIsRenewed() {
+ RenewDelegationTokenResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsRenewed();
+ }
+
+ @Override
+ public void setIsRenewed(boolean isRenewed) {
+ maybeInitBuilder();
+ builder.setIsRenewed(isRenewed);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c43dc1a..77e62e0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -437,6 +437,20 @@
+ Interval at which the thread of cleanup delegation token
+ operations runs
+ yarn.resourcemanager.delegation-token-operation.cleanup-interval-ms
+ 60000
+
+
+
+ The max life time of a delegation token operation after its
+ finish before its cleanup
+ yarn.resourcemanager.delegation-token-operation.cleanup-delay-ms
+ 30000
+
+
+
Interval for the roll over for the master key used to generate
application tokens
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index f0e8553..6bbe5fb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -100,7 +100,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
/**
@@ -610,8 +609,6 @@ public GetDelegationTokenResponse getDelegationToken(
"Delegation Token can be issued only with kerberos authentication");
}
- GetDelegationTokenResponse response =
- recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
@@ -621,17 +618,26 @@ public GetDelegationTokenResponse getDelegationToken(
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
- Token realRMDTtoken =
- new Token(tokenIdentifier,
- this.rmDTSecretManager);
- response.setRMDelegationToken(
- BuilderUtils.newDelegationToken(
+ Boolean inProgress =
+ rmDTSecretManager.isGetDelegationTokenInProgress(tokenIdentifier);
+ if (inProgress != null) {
+ if (inProgress) {
+ return GetDelegationTokenResponse.newInstance(false, null);
+ } else {
+ Token realRMDTtoken =
+ rmDTSecretManager.pollGetDelegationTokenResponse(tokenIdentifier);
+ return GetDelegationTokenResponse.newInstance(true,
+ BuilderUtils.newDelegationToken(
realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(),
- realRMDTtoken.getService().toString()
- ));
- return response;
+ realRMDTtoken.getService().toString()));
+ }
+ }
+ rmDTSecretManager.recordToken(tokenIdentifier,
+ new Token(
+ tokenIdentifier, rmDTSecretManager));
+ return GetDelegationTokenResponse.newInstance(false, null);
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
}
@@ -651,12 +657,21 @@ public RenewDelegationTokenResponse renewDelegationToken(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ Boolean inProgress =
+ rmDTSecretManager.isRenewDelegationTokenInProgress(rmDTIdentifier);
+ if (inProgress != null) {
+ if (inProgress) {
+ return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE);
+ } else {
+ long expTime =
+ rmDTSecretManager.pollRenewDelegationTokenResponse(rmDTIdentifier);
+ return RenewDelegationTokenResponse.newInstance(true, expTime);
+ }
+ }
String user = getRenewerForToken(token);
- long nextExpTime = rmDTSecretManager.renewToken(token, user);
- RenewDelegationTokenResponse renewResponse = Records
- .newRecord(RenewDelegationTokenResponse.class);
- renewResponse.setNextExpirationTime(nextExpTime);
- return renewResponse;
+ rmDTSecretManager.renewToken(token, user);
+ return RenewDelegationTokenResponse.newInstance(false, Long.MIN_VALUE);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
@@ -675,9 +690,18 @@ public CancelDelegationTokenResponse cancelDelegationToken(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ Boolean inProgress =
+ rmDTSecretManager.isCancelDelegationTokenInProgress(rmDTIdentifier);
+ if (inProgress != null) {
+ if (!inProgress) {
+ rmDTSecretManager.pollCancelDelegationTokenResponse(rmDTIdentifier);
+ }
+ return CancelDelegationTokenResponse.newInstance(!inProgress);
+ }
String user = getRenewerForToken(token);
rmDTSecretManager.cancelToken(token, user);
- return Records.newRecord(CancelDelegationTokenResponse.class);
+ return CancelDelegationTokenResponse.newInstance(false);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 597d18c..68a1dfb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -83,6 +83,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -427,8 +428,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
- rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+ rmDispatcher.register(RMDelegationTokenEventType.class, rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
@@ -897,7 +898,8 @@ protected ResourceTrackerService createResourceTrackerService() {
}
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager(RMContext rmContext) {
+ createRMDelegationTokenSecretManager(RMContext rmContext,
+ Configuration conf) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -909,7 +911,7 @@ protected ResourceTrackerService createResourceTrackerService() {
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
+ tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext, conf);
}
protected ClientRMService createClientRMService() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index a845264..66d7042 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -59,6 +59,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenEventType;
@Private
@Unstable
@@ -431,18 +433,17 @@ protected abstract void updateApplicationAttemptStateInternal(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/**
- * RMDTSecretManager call this to store the state of a delegation token
+ * Non-blocking
+ * API RMDTSecretManager call this to store the state of a delegation token
* and sequence number
*/
+ @SuppressWarnings("unchecked")
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) {
- try {
- storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
- latestSequenceNumber);
- } catch (Exception e) {
- notifyStoreOperationFailed(e);
- }
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRMDelegationTokenAndSequenceNumberEvent(
+ rmDTIdentifier, renewDate, latestSequenceNumber));
}
/**
@@ -455,15 +456,29 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState(
int latestSequenceNumber) throws Exception;
/**
- * RMDTSecretManager call this to remove the state of a delegation token
+ * Non-blocking
+ * API RMDTSecretManager call this to update the state of a delegation token
+ * and sequence number
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void updateRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) {
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent(
+ rmDTIdentifier, renewDate, latestSequenceNumber));
+ }
+
+ /**
+ * Non-blocking
+ * API RMDTSecretManager call this to remove the state of a delegation token
*/
+ @SuppressWarnings("unchecked")
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
- try {
- removeRMDelegationTokenState(rmDTIdentifier);
- } catch (Exception e) {
- notifyStoreOperationFailed(e);
- }
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRemoveRMDelegationToken(
+ rmDTIdentifier, sequenceNumber));
}
/**
@@ -665,6 +680,57 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
LOG.error("Error removing app: " + appId, e);
notifyStoreOperationFailed(e);
}
+ } else if (event.getType().equals(RMStateStoreEventType.STORE_RM_DT_SN)) {
+ RMStateStoreRMDelegationTokenAndSequenceNumberEvent rmDTAndSNEvent =
+ (RMStateStoreRMDelegationTokenAndSequenceNumberEvent) event;
+ Exception storedException = null;
+ LOG.info("Storing info for delegation token: " +
+ rmDTAndSNEvent.getRmDTIdentifier());
+ try {
+ storeRMDelegationTokenAndSequenceNumberState(
+ rmDTAndSNEvent.getRmDTIdentifier(), rmDTAndSNEvent.getRenewDate(),
+ rmDTAndSNEvent.getSequenceNumber());
+ notifyDoneStoringRMDelegationTokenAndSequenceNumber(
+ rmDTAndSNEvent.getRmDTIdentifier(), storedException);
+ } catch (Exception e) {
+ LOG.error("Error storing delegation token: " +
+ rmDTAndSNEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.REMOVE_RM_DT_SN)) {
+ RMStateStoreRMDelegationTokenAndSequenceNumberEvent removeRmDTAndSNEvent =
+ (RMStateStoreRMDelegationTokenAndSequenceNumberEvent) event;
+ Exception removedException = null;
+ LOG.info("Removing info for delegation token: " +
+ removeRmDTAndSNEvent.getRmDTIdentifier());
+ try {
+ removeRMDelegationTokenState(removeRmDTAndSNEvent.getRmDTIdentifier());
+ notifyDoneRemovingRMDelegationToken(
+ removeRmDTAndSNEvent.getRmDTIdentifier(), removedException);
+ } catch (Exception e) {
+ LOG.error("Error removing delegation token: " +
+ removeRmDTAndSNEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.UPDATE_RM_DT_SN)) {
+ RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent updateRmDTAndSNEvent =
+ (RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent) event;
+ Exception updatedException = null;
+ LOG.info("Updating info for delegation token: " +
+ updateRmDTAndSNEvent.getRmDTIdentifier());
+ try {
+ removeRMDelegationTokenState(updateRmDTAndSNEvent.getRmDTIdentifier());
+ storeRMDelegationTokenAndSequenceNumberState(
+ updateRmDTAndSNEvent.getRmDTIdentifier(),
+ updateRmDTAndSNEvent.getRenewDate(),
+ updateRmDTAndSNEvent.getSequenceNumber());
+ notifyDoneUpdatingRMDelegationTokenAndSequenceNumber(
+ updateRmDTAndSNEvent.getRmDTIdentifier(), updatedException);
+ } catch (Exception e) {
+ LOG.error("Error updating delegation token: " +
+ updateRmDTAndSNEvent.getRmDTIdentifier(), e);
+ notifyStoreOperationFailed(e);
+ }
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
@@ -729,6 +795,60 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
}
/**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that new delegation token is stored in state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been saved
+ * @param storedException the exception that is thrown when storing the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneStoringRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception storedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_STORED,
+ rmDTIdentifier, storedException));
+ }
+
+ /**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that new delegation token is removed from state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been removed
+ * @param storedException the exception that is thrown when removing the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneRemovingRMDelegationToken(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception storedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_REMOVED,
+ rmDTIdentifier, storedException));
+ }
+
+ /**
+ * In (@link handleStoreEvent}, this method is called to notify the
+ * RMDelegationTokenSecretManager that the delegation token is updated in state
+ * store
+ * @param rmDTIdentifier identifier of the delegation token that has been updated
+ * @param storedException the exception that is thrown when updating the
+ * delegation token
+ */
+ @SuppressWarnings("unchecked")
+ private void notifyDoneUpdatingRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Exception updatedException) {
+ //TODO: YARN-1397 will clean the exception dependency
+ rmDispatcher.getEventHandler().handle(
+ new RMDelegationTokenEvent(RMDelegationTokenEventType.DT_UPDATED,
+ rmDTIdentifier, updatedException));
+ }
+
+ /**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 903f4e7..0339fa9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -23,5 +23,8 @@
STORE_APP,
UPDATE_APP,
UPDATE_APP_ATTEMPT,
- REMOVE_APP
+ REMOVE_APP,
+ STORE_RM_DT_SN,
+ REMOVE_RM_DT_SN,
+ UPDATE_RM_DT_SN
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java
new file mode 100644
index 0000000..7dc266f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDelegationTokenAndSequenceNumberEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreRMDelegationTokenAndSequenceNumberEvent
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final Long renewDate;
+ private final int sequenceNumber;
+
+ public RMStateStoreRMDelegationTokenAndSequenceNumberEvent(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Long renewDate, int sequenceNumber) {
+ super(RMStateStoreEventType.STORE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.renewDate = renewDate;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Long getRenewDate() {
+ return renewDate;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationToken.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationToken.java
new file mode 100644
index 0000000..5f0cd53
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveRMDelegationToken.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreRemoveRMDelegationToken
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final int sequenceNumber;
+
+ public RMStateStoreRemoveRMDelegationToken(
+ RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+ super(RMStateStoreEventType.REMOVE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java
new file mode 100644
index 0000000..47b713c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+public class RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent
+ extends RMStateStoreEvent {
+
+ private final RMDelegationTokenIdentifier rmDTIdentifier;
+ private final Long renewDate;
+ private final int sequenceNumber;
+
+ public RMStateStoreUpdateRMDelegationTokenAndSequenceNumberEvent(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Long renewDate, int sequenceNumber) {
+ super(RMStateStoreEventType.UPDATE_RM_DT_SN);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.renewDate = renewDate;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public RMDelegationTokenIdentifier getRmDTIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Long getRenewDate() {
+ return renewDate;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
new file mode 100644
index 0000000..b4ad11d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+
+public class RMDelegationTokenEvent extends
+ AbstractEvent {
+
+ private RMDelegationTokenIdentifier rmDTIdentifier;
+ private Exception e;
+
+ public RMDelegationTokenEvent(RMDelegationTokenEventType type,
+ RMDelegationTokenIdentifier rmDTIdentifier, Exception e) {
+ super(type);
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.e = e;
+ }
+
+ public RMDelegationTokenIdentifier getRMDelegationTokenIdentifier() {
+ return rmDTIdentifier;
+ }
+
+ public Exception getException() {
+ return e;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
new file mode 100644
index 0000000..9ac2c2c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+
+public enum RMDelegationTokenEventType {
+ //Source: RMStateStore
+ DT_STORED,
+ DT_REMOVED,
+ DT_UPDATED
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
index 23939de..ad0a495 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
@@ -23,15 +23,21 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -48,11 +54,15 @@
@InterfaceStability.Unstable
public class RMDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager implements
- Recoverable {
+ Recoverable, EventHandler{
private static final Log LOG = LogFactory
.getLog(RMDelegationTokenSecretManager.class);
protected final RMContext rmContext;
+ protected final ConcurrentMap dtOperations =
+ new ConcurrentHashMap();
+ protected final long dtOperationsCleanupInterval;
+ protected final long dtOperationsCleanupDelay;
/**
* Create a secret manager
@@ -63,15 +73,50 @@
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
+ * @param rmContext
+ * @param conf
*/
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval,
- RMContext rmContext) {
+ RMContext rmContext,
+ Configuration conf) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.rmContext = rmContext;
+ dtOperationsCleanupInterval = conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_INTERVAL_MS);
+ dtOperationsCleanupDelay = conf.getLong(
+ YarnConfiguration.RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_OPERATION_CLEANUP_DELAY_MS);
+ Thread dtOperationsCleaner = new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(dtOperationsCleanupInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "The thread of cleanup finished delegation token operations is interrupted");
+ return;
+ }
+ for (DelegationTokenOperationKey key : dtOperations.keySet()) {
+ DelegationTokenOperationValue value = dtOperations.get(key);
+ if (value != null &&
+ value.getState() == DelegationTokenOperationState.FINISHED) {
+ long delay = System.currentTimeMillis() - value.getFinishTime();
+ if (delay >= dtOperationsCleanupDelay) {
+ dtOperations.remove(key);
+ }
+ }
+ }
+ }
+ }
+ };
+ dtOperationsCleaner.setDaemon(true);
+ dtOperationsCleaner.start();
}
@Override
@@ -122,9 +167,7 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id,
try {
LOG.info("updating RMDelegation token with sequence number: "
+ id.getSequenceNumber());
- rmContext.getStateStore().removeRMDelegationToken(id,
- delegationTokenSequenceNumber);
- rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
+ rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
renewDate, id.getSequenceNumber());
} catch (Exception e) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
@@ -195,4 +238,211 @@ public void recover(RMState rmState) throws Exception {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
+
+ public Boolean isGetDelegationTokenInProgress(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return isDelegationTokenOperationInProgress(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ }
+
+ public Boolean isCancelDelegationTokenInProgress(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return isDelegationTokenOperationInProgress(
+ rmDTIdentifier, DelegationTokenOperationType.CANCEL);
+ }
+
+ public Boolean isRenewDelegationTokenInProgress(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ return isDelegationTokenOperationInProgress(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ }
+
+ protected Boolean isDelegationTokenOperationInProgress(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.putIfAbsent(
+ key, new DelegationTokenOperationValue());
+ if (value == null) {
+ return null;
+ } else {
+ return value.getState() == DelegationTokenOperationState.PROGRESSING;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Token pollGetDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ Object obj = pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ assert obj instanceof Token>;
+ return (Token) obj;
+ }
+
+ public void pollCancelDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.CANCEL);
+ }
+
+ public long pollRenewDelegationTokenResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier) {
+ Object obj = pollDelegationTokenOperationResponse(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ assert obj instanceof Long;
+ return (Long) obj;
+ }
+
+ protected Object pollDelegationTokenOperationResponse(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.remove(key);
+ if (value == null) {
+ return null;
+ } else {
+ return value.getResponse();
+ }
+ }
+
+ @Override
+ public synchronized long renewToken(Token token,
+ String renewer) throws InvalidToken, IOException {
+ long renewTime = super.renewToken(token, renewer);
+ RMDelegationTokenIdentifier rmDTIdentifier = token.decodeIdentifier();
+ DelegationTokenOperationValue value = getDelegationTokenOperationValue(
+ rmDTIdentifier, DelegationTokenOperationType.RENEW);
+ if (value != null) {
+ value.setResponse(renewTime);
+ }
+ return renewTime;
+ }
+
+ public synchronized void recordToken(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ Token token) {
+ DelegationTokenOperationValue value = getDelegationTokenOperationValue(
+ rmDTIdentifier, DelegationTokenOperationType.GET);
+ if (value != null) {
+ value.setResponse(token);
+ }
+ }
+
+ @Override
+ public void handle(RMDelegationTokenEvent event) {
+ DelegationTokenOperationValue value;
+ switch (event.getType()) {
+ case DT_STORED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.GET);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ break;
+ case DT_REMOVED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.CANCEL);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ break;
+ case DT_UPDATED:
+ value = getDelegationTokenOperationValue(
+ event.getRMDelegationTokenIdentifier(),
+ DelegationTokenOperationType.RENEW);
+ if (value != null) {
+ value.setState(DelegationTokenOperationState.FINISHED);
+ }
+ break;
+ default:
+ LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
+ }
+ }
+
+ protected DelegationTokenOperationValue getDelegationTokenOperationValue(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ DelegationTokenOperationKey key = new DelegationTokenOperationKey(
+ rmDTIdentifier, type);
+ DelegationTokenOperationValue value = dtOperations.get(key);
+ if (value == null) {
+ LOG.error("The delegation token operation " + type +
+ " of the identifier " + rmDTIdentifier + " is missing");
+ }
+ return value;
+ }
+ protected static enum DelegationTokenOperationType {
+ GET,
+ RENEW,
+ CANCEL
+ }
+
+ protected static enum DelegationTokenOperationState {
+ PROGRESSING,
+ FINISHED
+ }
+
+ protected static class DelegationTokenOperationKey {
+ private RMDelegationTokenIdentifier rmDTIdentifier;
+ private DelegationTokenOperationType type;
+
+ public DelegationTokenOperationKey(
+ RMDelegationTokenIdentifier rmDTIdentifier,
+ DelegationTokenOperationType type) {
+ this.rmDTIdentifier = rmDTIdentifier;
+ this.type = type;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof DelegationTokenOperationKey) {
+ DelegationTokenOperationKey that = (DelegationTokenOperationKey) obj;
+ return rmDTIdentifier.equals(that.rmDTIdentifier) &&
+ type.equals(that.type);
+ }
+ return false;
+ }
+ }
+
+ protected static class DelegationTokenOperationValue {
+ private Object response;
+ private DelegationTokenOperationState state;
+ private long finishTime;
+
+ public DelegationTokenOperationValue() {
+ state = DelegationTokenOperationState.PROGRESSING;
+ finishTime = Long.MAX_VALUE;
+ }
+
+ public Object getResponse() {
+ return response;
+ }
+
+ public void setResponse(Object response) {
+ this.response = response;
+ }
+
+ public DelegationTokenOperationState getState() {
+ return state;
+ }
+
+ public void setState(DelegationTokenOperationState state) {
+ this.state = state;
+ if (state == DelegationTokenOperationState.FINISHED) {
+ finishTime = System.currentTimeMillis();
+ }
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index ca6dc3e..331a48b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -19,21 +19,21 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,9 +51,9 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@@ -116,7 +116,8 @@
public static void setupSecretManager() throws IOException {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
- dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
+ dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext,
+ new YarnConfiguration());
dtsm.startThreads();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
index d389c0e..cfcdfea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
@@ -469,7 +469,7 @@ private static ResourceScheduler createMockScheduler(Configuration conf) {
RMDelegationTokenSecretManager rmDtSecretManager =
new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
- tokenRenewInterval, 3600000, rmContext);
+ tokenRenewInterval, 3600000, rmContext, new YarnConfiguration());
return rmDtSecretManager;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
index 6cc0a18..ff71c59 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
@@ -167,7 +167,8 @@ public MyMockRM(Configuration conf, RMStateStore store) {
@Override
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager(RMContext rmContext) {
+ createRMDelegationTokenSecretManager(RMContext rmContext,
+ Configuration conf) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
@@ -184,7 +185,7 @@ public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenRemoverScanInterval, RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
- rmContext);
+ rmContext, new YarnConfiguration());
}
@Override