diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f9922507be..1b4339ba5d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract void setRegisteringCollectors(Map getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..de8545be938 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -118,4 +118,9 @@ public abstract void setContainerQueuingLimit( public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 675b375c1d6..ffc5cf7a8a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -58,4 +58,8 @@ public abstract void setAreNodeLabelsAcceptedByRM( boolean areNodeLabelsAcceptedByRM); + + public abstract void setTokenSequenceNo(long sequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a74b3..b64b8f3572b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,16 @@ public void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..6ae34816f40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -874,5 +874,17 @@ private TokenProto convertToProtoFormat(Token t) { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index 63213090d45..cbfa3869892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -269,4 +269,16 @@ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) { maybeInitBuilder(); this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + + @Override + public void setTokenSequenceNo(long sequenceNo) { + this.builder.setTokenSequenceNo(sequenceNo); + } + + @Override + public long getTokenSequenceNo() { + RegisterNodeManagerResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..9e6d9cb3983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -83,6 +83,7 @@ message RegisterNodeManagerResponseProto { optional string rm_version = 6; optional bool areNodeLabelsAcceptedByRM = 7 [default = false]; optional ResourceProto resource = 8; + optional int64 tokenSequenceNo = 9; } message UnRegisterNodeManagerRequestProto { @@ -100,6 +101,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -128,6 +130,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional int64 tokenSequenceNo = 18; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..4db18c99edf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -157,6 +157,7 @@ private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -475,6 +476,8 @@ protected void registerWithRM() .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); LOG.info(successfullRegistrationMsg.toString()); + + this.tokenSequenceNo = regNMResponse.getTokenSequenceNo(); } private List createKeepAliveApplicationList() { @@ -1145,6 +1148,8 @@ public void run() { } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1227,6 +1232,8 @@ public void run() { updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 159659834c8..ef4edb985a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -118,6 +119,7 @@ private PlacementConstraintManager placementConstraintManager; private ResourceProfilesManager resourceProfilesManager; private MultiNodeSortingManager multiNodeSortingManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -554,4 +556,21 @@ public void setResourceProfilesManager( ResourceProfilesManager resourceProfilesManager) { this.resourceProfilesManager = resourceProfilesManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index d3daa05e16a..f6b4188bb92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -188,4 +188,8 @@ void setPlacementConstraintManager( void setMultiNodeSortingManager( MultiNodeSortingManager multiNodeSortingManager); + + long getTokenSequenceNo(); + + void incrTokenSeqeuenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 5b295f63c17..4686c08d4fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -637,6 +637,16 @@ public String getAppProxyUrl(Configuration conf, ApplicationId applicationId) public void setResourceProfilesManager(ResourceProfilesManager mgr) { this.activeServiceContext.setResourceProfilesManager(mgr); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSeqeuenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } // Note: Read java doc before adding any services over here. @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..44ebb213f65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -521,6 +521,9 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); response.setRMVersion(YarnVersionInfo.getVersion()); + //-1 is to ensure heartbeat ping (happens immediately after registration) + //receive credentials + response.setTokenSequenceNo(this.rmContext.getTokenSequenceNo()-1); return response; } @@ -618,11 +621,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -893,4 +892,22 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + LOG.debug("Token sequence no received from hearbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo()); + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + LOG.debug("System credentials for apps size: " + + systemCredentials.size()); + if (!systemCredentials.isEmpty()) { + nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd16bee..1f8e6250309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -679,6 +679,7 @@ private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,9 +704,15 @@ private void requestNewHdfsDelegationTokenAsProxyUser( appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSeqeuenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..cc728b8656c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -68,6 +68,7 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); private Set nodeLabels; + private long tokenSequenceNo; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -206,6 +207,7 @@ public RegisterNodeManagerResponse registerNode( } } responseId = 0; + this.tokenSequenceNo = registrationResponse.getTokenSequenceNo(); return registrationResponse; } @@ -278,6 +280,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); req.setRegisteringCollectors(this.registeringCollectors); + req.setTokenSequenceNo(this.tokenSequenceNo); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); @@ -302,6 +305,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, capability = Resources.clone(newResource); } + this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo(); return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 4f9469548ae..9d496658a1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; @@ -106,7 +107,8 @@ public void testRPCResponseId() throws IOException, YarnException { request1.setNodeId(nodeId); request1.setHttpPort(0); request1.setResource(capability); - resourceTrackerService.registerNodeManager(request1); + RegisterNodeManagerResponse response1 = + resourceTrackerService.registerNodeManager(request1); org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory. newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class); @@ -117,6 +119,7 @@ public void testRPCResponseId() throws IOException, YarnException { NodeHeartbeatRequest nodeHeartBeatRequest = recordFactory .newRecordInstance(NodeHeartbeatRequest.class); nodeHeartBeatRequest.setNodeStatus(nodeStatus); + nodeHeartBeatRequest.setTokenSequenceNo(response1.getTokenSequenceNo()); nodeStatus.setResponseId(0); NodeHeartbeatResponse response = resourceTrackerService.nodeHeartbeat( @@ -130,6 +133,7 @@ public void testRPCResponseId() throws IOException, YarnException { /* try calling with less response id */ response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest); Assert.assertTrue(response.getResponseId() == 2); + Assert.assertEquals(1, response.getTokenSequenceNo()); nodeStatus.setResponseId(0); response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b327f1..54219532d3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -146,8 +147,13 @@ public boolean isManaged(Token token) throws IOException { @Override public long renew(Token t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - // renew in 3 seconds - return System.currentTimeMillis() + 3000; + if(conf.get("override_token_expire_time") != null) { + return System.currentTimeMillis() + + Long.parseLong(conf.get("override_token_expire_time")); + } else { + // renew in 3 seconds + return System.currentTimeMillis() + 3000; + } } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -201,6 +207,7 @@ public void setUp() throws Exception { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); dispatcher = new AsyncDispatcher(eventQueue); @@ -890,7 +897,7 @@ protected void doSecureLogin() throws IOException { } - @Test (timeout = 20000) + @Test (timeout = 30000) public void testReplaceExpiringDelegationToken() throws Exception { conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -1444,4 +1451,118 @@ public void testShutDown() { delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); } + + @Test + public void testTokenSequencNo() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + + // set max date to 0 to simulate an expiring token; + dtId1.setMaxDate(0); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token expectedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + + // Set token expire time to 60s. + conf.set("override_token_expire_time", "60000"); + expectedToken.renew(conf); + + final MockRM rm = new TestSecurityMockRM(conf, null) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token[] {expectedToken}; + } + }; + } + }; + rm.start(); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + RegisterNodeManagerResponse regNMResponse = nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + assertEquals(0, response.getSystemCredentialsForApps().size()); + + RMApp app = rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // wait for the initial expiring hdfs token to be removed from allTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return + rm.getRMContext().getDelegationTokenRenewer().getAllTokens() + .get(token1) == null; + } + }, 5000, 110000); + + // wait for the initial expiring hdfs token to be removed from appTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return !rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(token1); + } + }, 5000, 110000); + + // wait for the new retrieved hdfs token. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(expectedToken); + } + }, 5000, 110000); + + NodeHeartbeatResponse response4 = nm1.nodeHeartbeat(true); + assertEquals(1, response4.getSystemCredentialsForApps().size()); + + ByteBuffer tokenBuffer = + response4.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); + + //Sleep for 10s. Still token wouldn't expired. + Thread.sleep(10000); + NodeHeartbeatResponse response5 = nm1.nodeHeartbeat(true); + assertEquals(0, response5.getSystemCredentialsForApps().size()); + + //Sleep for 10s. Still token wouldn't expired. + Thread.sleep(10000); + NodeHeartbeatResponse response6 = nm1.nodeHeartbeat(true); + assertEquals(0, response6.getSystemCredentialsForApps().size()); + + //Sleep for 70s. By this time, token would have expired and renewed + Thread.sleep(70000); + NodeHeartbeatResponse response7 = nm1.nodeHeartbeat(true); + assertEquals(1, response7.getSystemCredentialsForApps().size()); + } }