diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f9922507be..1b4339ba5d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract void setRegisteringCollectors(Map getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..de8545be938 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -118,4 +118,9 @@ public abstract void setContainerQueuingLimit( public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a74b3..fd10bc29fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,20 @@ public void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasTokenSequenceNo()) { + return 0; + } + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..8628b126678 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -874,5 +874,21 @@ private TokenProto convertToProtoFormat(Token t) { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasTokenSequenceNo()) { + return 0; + } + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..02a001249dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -128,6 +129,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional int64 tokenSequenceNo = 18; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..c069baf2ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -157,6 +157,7 @@ private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -1145,6 +1146,8 @@ public void run() { } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1227,6 +1230,8 @@ public void run() { updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f829a4cbbc4..d592eafff9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -124,6 +125,8 @@ private ProxyCAManager proxyCAManager; private VolumeManager volumeManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -583,4 +586,21 @@ public VolumeManager getVolumeManager() { public void setVolumeManager(VolumeManager volumeManager) { this.volumeManager = volumeManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 4e9846c731b..9adc9c857a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -198,4 +198,8 @@ void setMultiNodeSortingManager( VolumeManager getVolumeManager(); void setVolumeManager(VolumeManager volumeManager); + + long getTokenSequenceNo(); + + void incrTokenSeqeuenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ab71134c93f..62467b528d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -666,4 +666,14 @@ public void setVolumeManager(VolumeManager volumeManager) { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSeqeuenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..06cb6bc50d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -618,11 +618,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -893,4 +889,22 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + LOG.debug("Token sequence no received from hearbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo()); + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + LOG.debug("System credentials for apps size: " + + systemCredentials.size()); + if (!systemCredentials.isEmpty()) { + nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd16bee..1f8e6250309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -679,6 +679,7 @@ private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,9 +704,15 @@ private void requestNewHdfsDelegationTokenAsProxyUser( appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSeqeuenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..fe3a889af9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -68,6 +68,7 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); private Set nodeLabels; + private long tokenSequenceNo; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -278,6 +279,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); req.setRegisteringCollectors(this.registeringCollectors); + req.setTokenSequenceNo(this.tokenSequenceNo); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); @@ -302,6 +304,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, capability = Resources.clone(newResource); } + this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo(); return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b327f1..fb71c42c46d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -146,8 +147,13 @@ public boolean isManaged(Token token) throws IOException { @Override public long renew(Token t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - // renew in 3 seconds - return System.currentTimeMillis() + 3000; + if(conf.get("override_token_expire_time") != null) { + return System.currentTimeMillis() + + Long.parseLong(conf.get("override_token_expire_time")); + } else { + // renew in 3 seconds + return System.currentTimeMillis() + 3000; + } } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -201,6 +207,7 @@ public void setUp() throws Exception { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); dispatcher = new AsyncDispatcher(eventQueue); @@ -890,7 +897,7 @@ protected void doSecureLogin() throws IOException { } - @Test (timeout = 20000) + @Test (timeout = 30000) public void testReplaceExpiringDelegationToken() throws Exception { conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -1444,4 +1451,121 @@ public void testShutDown() { delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); } + + @Test + public void testTokenSequencNo() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + + // set max date to 0 to simulate an expiring token; + dtId1.setMaxDate(0); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token expectedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + + // Set token expire time to 60s. + conf.set("override_token_expire_time", "60000"); + expectedToken.renew(conf); + + final MockRM rm = new TestSecurityMockRM(conf, null) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token[] {expectedToken}; + } + }; + } + }; + rm.start(); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + RegisterNodeManagerResponse regNMResponse = nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + assertEquals(0, response.getSystemCredentialsForApps().size()); + + RMApp app = rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // wait for the initial expiring hdfs token to be removed from allTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return + rm.getRMContext().getDelegationTokenRenewer().getAllTokens() + .get(token1) == null; + } + }, 5000, 110000); + + // wait for the initial expiring hdfs token to be removed from appTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return !rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(token1); + } + }, 5000, 110000); + + // wait for the new retrieved hdfs token. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(expectedToken); + } + }, 5000, 110000); + + NodeHeartbeatResponse response4 = nm1.nodeHeartbeat(true); + assertEquals(1, response4.getSystemCredentialsForApps().size()); + + ByteBuffer tokenBuffer = + response4.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); + + // wait for the hdfs tokens expiration so that renewal happens and + // creds would get passed as part of heartbeat response + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + NodeHeartbeatResponse response7 = nm1.nodeHeartbeat(true); + if (response7.getSystemCredentialsForApps().size() == 1) { + assertEquals(1, response7.getSystemCredentialsForApps().size()); + return true; + } else { + return false; + } + } catch (Exception e) { + return false; + } + } + }, 5000, 90000); + } }