diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4008a97f668..f175cf3e6cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -92,12 +94,16 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; import org.apache.hadoop.yarn.server.api.ContainerType; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.google.protobuf.ByteString; @Private @Unstable public class ProtoUtils { + public static final Interner BYTE_STRING_INTERNER = + Interners.newWeakInterner(); /* * ContainerState @@ -578,6 +584,18 @@ public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) { TimedPlacementConstraintProto.DelayUnit u) { return TimedPlacementConstraint.DelayUnit.valueOf(u.name()); } + + /* + * ApplicationId + */ + public static ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + public static ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f9922507be..1b4339ba5d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract void setRegisteringCollectors(Map getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..9fdfeddb3bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -88,13 +88,6 @@ public abstract void addAllContainersToSignal( public abstract void setDiagnosticsMessage(String diagnosticsMessage); - // Credentials (i.e. hdfs tokens) needed by NodeManagers for application - // localizations and logAggreations. - public abstract Map getSystemCredentialsForApps(); - - public abstract void setSystemCredentialsForApps( - Map systemCredentials); - public abstract boolean getAreNodeLabelsAcceptedByRM(); public abstract void setAreNodeLabelsAcceptedByRM( @@ -118,4 +111,15 @@ public abstract void setContainerQueuingLimit( public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); + + // Credentials (i.e. hdfs tokens) needed by NodeManagers for application + // localizations and logAggregations. + public abstract void setSystemCredentialsForApps( + Collection systemCredentials); + + public abstract Collection getSystemCredentialsForApps(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a74b3..7d96c8cad5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,17 @@ public void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..e36d6f84e2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -26,10 +25,6 @@ import java.util.List; import java.util.Map; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.protobuf.ByteString; - import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; @@ -39,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; @@ -76,7 +70,8 @@ private List containersToCleanup = null; private List containersToBeRemovedFromNM = null; private List applicationsToCleanup = null; - private Map systemCredentials = null; + private List systemCredentials = + null; private Resource resource = null; private Map appCollectorsMap = null; @@ -88,9 +83,6 @@ private List containersToDecrease = null; private List containersToSignal = null; - private static final Interner BYTE_STRING_INTERNER = - Interners.newWeakInterner(); - public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -152,12 +144,7 @@ private void mergeLocalToBuilder() { private void addSystemCredentialsToProto() { maybeInitBuilder(); builder.clearSystemCredentialsForApps(); - for (Map.Entry entry : systemCredentials.entrySet()) { - builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(BYTE_STRING_INTERNER.intern( - ProtoUtils.convertToProtoFormat(entry.getValue().duplicate())))); - } + builder.addAllSystemCredentialsForApps(this.systemCredentials); } private void addAppCollectorsMapToProto() { @@ -168,7 +155,7 @@ private void addAppCollectorsMapToProto() { AppCollectorData data = entry.getValue(); AppCollectorDataProto.Builder appCollectorDataBuilder = AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) + .setAppId(ProtoUtils.convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion()); @@ -477,7 +464,7 @@ private void initApplicationsToCleanup() { this.applicationsToCleanup = new ArrayList(); for (ApplicationIdProto c : list) { - this.applicationsToCleanup.add(convertFromProtoFormat(c)); + this.applicationsToCleanup.add(ProtoUtils.convertFromProtoFormat(c)); } } @@ -510,7 +497,7 @@ public boolean hasNext() { @Override public ApplicationIdProto next() { - return convertToProtoFormat(iter.next()); + return ProtoUtils.convertToProtoFormat(iter.next()); } @Override @@ -644,15 +631,6 @@ public void remove() { builder.addAllContainersToDecrease(iterable); } - @Override - public Map getSystemCredentialsForApps() { - if (this.systemCredentials != null) { - return this.systemCredentials; - } - initSystemCredentials(); - return systemCredentials; - } - @Override public Map getAppCollectors() { if (this.appCollectorsMap != null) { @@ -662,24 +640,13 @@ public void remove() { return appCollectorsMap; } - private void initSystemCredentials() { - NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getSystemCredentialsForAppsList(); - this.systemCredentials = new HashMap (); - for (SystemCredentialsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp()); - this.systemCredentials.put(appId, byteBuffer); - } - } - private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getAppCollectorsList(); if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); Token collectorToken = null; if (c.hasAppCollectorToken()){ collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); @@ -694,13 +661,26 @@ private void initAppCollectorsMap() { @Override public void setSystemCredentialsForApps( - Map systemCredentials) { - if (systemCredentials == null || systemCredentials.isEmpty()) { + Collection systemCredentialsForAppsProto) { + if (systemCredentialsForAppsProto == null + || systemCredentialsForAppsProto.isEmpty()) { return; } maybeInitBuilder(); - this.systemCredentials = new HashMap(); - this.systemCredentials.putAll(systemCredentials); + this.systemCredentials = + new ArrayList( + systemCredentialsForAppsProto); + } + + @Override + public Collection getSystemCredentialsForApps() { + if (systemCredentials != null) { + return systemCredentials; + } + + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + systemCredentials = p.getSystemCredentialsForAppsList(); + return systemCredentials; } @Override @@ -742,14 +722,6 @@ private ResourceProto convertToProtoFormat(Resource t) { return ProtoUtils.convertToProtoFormat(t); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } @@ -874,5 +846,18 @@ private TokenProto convertToProtoFormat(Token t) { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index f3331855597..83d22e9f6e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -18,12 +18,19 @@ package org.apache.hadoop.yarn.server.utils; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -66,4 +73,65 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, } return response; } + + /** + * Build SystemCredentialsForAppsProto objects + * + * @param applicationId Application ID + * @param credentials HDFS Tokens + * @return systemCredentialsForAppsProto SystemCredentialsForAppsProto + */ + public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto( + ApplicationId applicationId, ByteBuffer credentials) { + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + SystemCredentialsForAppsProto.newBuilder() + .setAppId(ProtoUtils.convertToProtoFormat(applicationId)) + .setCredentialsForApp(ProtoUtils.BYTE_STRING_INTERNER.intern( + ProtoUtils.convertToProtoFormat(credentials.duplicate()))) + .build(); + return systemCredentialsForAppsProto; + } + + /** + * Convert Collection of SystemCredentialsForAppsProto proto objects to a Map + * of ApplicationId to ByteBuffer + * + * @param systemCredentials List of SystemCredentialsForAppsProto proto + * objects + * @return systemCredentialsForApps Map of Application Id to ByteBuffer + */ + public static Map convertFromProtoFormat( + Collection systemCredentials) { + + Map systemCredentialsForApps = + new HashMap(systemCredentials.size()); + for (SystemCredentialsForAppsProto proto : systemCredentials) { + systemCredentialsForApps.put( + ProtoUtils.convertFromProtoFormat(proto.getAppId()), + ProtoUtils.convertFromProtoFormat(proto.getCredentialsForApp())); + } + return systemCredentialsForApps; + } + + /** + * Convert Map of Application Id to ByteBuffer to Collection of + * SystemCredentialsForAppsProto proto objects + * + * @param systemCredentials Map of Application Id to ByteBuffer + * @param systemCredentials List of SystemCredentialsForAppsProto proto + * objects + */ + public static List convertToProtoFormat( + Map systemCredentialsForApps) { + List systemCredentials = + new ArrayList( + systemCredentialsForApps.size()); + for (Map.Entry entry : systemCredentialsForApps + .entrySet()) { + SystemCredentialsForAppsProto proto = + newSystemCredentialsForAppsProto(entry.getKey(), entry.getValue()); + systemCredentials.add(proto); + } + return systemCredentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..02a001249dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -128,6 +129,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional int64 tokenSequenceNo = 18; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 8b1d0bb49e5..7cb4eb8f1a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,12 @@ import java.util.HashSet; import java.util.Map; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -56,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -153,10 +160,12 @@ public void testNodeHeartbeatRequestPBImplWithNullLabels() { /** * Test NodeHeartbeatResponsePBImpl. + * + * @throws IOException */ @Test - public void testNodeHeartbeatResponsePBImpl() { + public void testNodeHeartbeatResponsePBImpl() throws IOException { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); original.setDiagnosticsMessage("testDiagnosticMessage"); @@ -168,6 +177,28 @@ public void testNodeHeartbeatResponsePBImpl() { Map collectors = getCollectors(false); original.setAppCollectors(collectors); + // create token1 + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token expectedToken1 = + new Token(dtId1.getBytes(), + "password12".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials1 = new Credentials(); + credentials1.addToken(expectedToken1.getService(), expectedToken1); + + DataOutputBuffer dob1 = new DataOutputBuffer(); + credentials1.writeTokenStorageToStream(dob1); + + ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob1.getData(), 0, dob1.getLength()); + + Map systemCredentials = + new HashMap(); + systemCredentials.put(getApplicationId(1), byteBuffer1); + original.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(systemCredentials)); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); assertEquals(100, copy.getResponseId()); @@ -178,6 +209,22 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); + assertEquals(1, copy.getSystemCredentialsForApps().size()); + + Credentials credentials1Out = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + ByteBuffer buffer = + YarnServerBuilderUtils + .convertFromProtoFormat(copy.getSystemCredentialsForApps()) + .get(getApplicationId(1)); + Assert.assertNotNull(buffer); + buffer.rewind(); + buf.reset(buffer); + credentials1Out.readTokenStorageStream(buf); + assertEquals(1, credentials1Out.getAllTokens().size()); + // Ensure token1's password "password12" is available from proto response + assertEquals(10, + credentials1Out.getAllTokens().iterator().next().getPassword().length); } @Test @@ -376,7 +423,8 @@ public void testUnRegisterNodeManagerRequestPBImpl() throws Exception { AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); if (!hasNullCollectorToken) { data.setCollectorToken( - Token.newInstance(new byte[0], "kind", new byte[0], "s")); + org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0], + "kind", new byte[0], "s")); } Map collectorMap = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index e6e79d3f5dc..473557f8b89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; -import com.google.common.collect.Sets; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -38,28 +37,29 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.NodeAttribute; -import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; - import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb .NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; - import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestProtocolRecords { @Test @@ -154,14 +154,17 @@ public void testNodeHeartBeatResponse() throws IOException { DataOutputBuffer dob = new DataOutputBuffer(); app1Cred.writeTokenStorageToStream(dob); - ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); - record.setSystemCredentialsForApps(appCredentials); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer); + record.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(appCredentials)); NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( ((NodeHeartbeatResponsePBImpl) record).getProto()); - Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); + Assert.assertEquals(appCredentials, YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps())); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..1d42bb3b050 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.Resources; @@ -157,6 +158,7 @@ private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -1145,6 +1147,8 @@ public void run() { } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1183,7 +1187,8 @@ public void run() { CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } Map systemCredentials = - response.getSystemCredentialsForApps(); + YarnServerBuilderUtils.convertFromProtoFormat( + response.getSystemCredentialsForApps()); if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( parseCredentials(systemCredentials)); @@ -1227,6 +1232,8 @@ public void run() { updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 8435340164a..736b7d1027a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.EOFException; @@ -79,6 +78,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; +import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; @@ -112,6 +113,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater extends NodeManagerTestBase { @@ -325,16 +328,29 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(context, dispatcher, healthChecker, metrics); - resourceTracker = new MyResourceTracker4(context); + ResourceTracker resourceTracker = new MyResourceTracker4(context); + InetSocketAddress address = new InetSocketAddress(0); + Configuration configuration = new Configuration(); + Server server = RpcServerFactoryPBImpl.get().getServer( + ResourceTracker.class, resourceTracker, address, configuration, null, + 1); + server.start(); + this.resourceTracker = (ResourceTracker) RpcClientFactoryPBImpl.get() + .getClient( + ResourceTracker.class, 1, NetUtils.getConnectAddress(server), + configuration); } @Override - protected ResourceTracker getRMClient() { + protected ResourceTracker getRMClient() throws IOException { return resourceTracker; } @Override protected void stopRMProxy() { + if (this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } return; } } @@ -780,7 +796,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); - nhResponse.setSystemCredentialsForApps(appCredentials); + nhResponse.setSystemCredentialsForApps( + YarnServerBuilderUtils.convertToProtoFormat(appCredentials)); return nhResponse; } @@ -1702,7 +1719,8 @@ protected ContainerManagerImpl createContainerManager(Context context, @Test public void testConcurrentAccessToSystemCredentials(){ - final Map testCredentials = new HashMap<>(); + final Map testCredentials = + new HashMap<>(); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); ApplicationId applicationId = ApplicationId.newInstance(123456, 120); testCredentials.put(applicationId, byteBuffer); @@ -1727,8 +1745,9 @@ public void run() { NodeHeartbeatResponse nodeHeartBeatResponse = newNodeHeartbeatResponse(0, NodeAction.NORMAL, null, null, null, null, 0); - nodeHeartBeatResponse.setSystemCredentialsForApps( - testCredentials); + nodeHeartBeatResponse + .setSystemCredentialsForApps(YarnServerBuilderUtils + .convertToProtoFormat(testCredentials)); NodeHeartbeatResponseProto proto = ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse) .getProto(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f829a4cbbc4..bf66034db07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -81,8 +82,8 @@ private final ConcurrentMap inactiveNodes = new ConcurrentHashMap(); - private final ConcurrentMap systemCredentials = - new ConcurrentHashMap(); + private final ConcurrentMap systemCredentials = + new ConcurrentHashMap(); private boolean isWorkPreservingRecoveryEnabled; @@ -124,6 +125,8 @@ private ProxyCAManager proxyCAManager; private VolumeManager volumeManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -509,7 +512,7 @@ public void setSystemClock(Clock clock) { @Private @Unstable - public ConcurrentMap getSystemCredentialsForApps() { + public ConcurrentMap getSystemCredentialsForApps() { return systemCredentials; } @@ -583,4 +586,21 @@ public VolumeManager getVolumeManager() { public void setVolumeManager(VolumeManager volumeManager) { this.volumeManager = volumeManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 4e9846c731b..887dbc560e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -29,6 +28,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -72,8 +72,8 @@ RMStateStore getStateStore(); ConcurrentMap getRMApps(); - - ConcurrentMap getSystemCredentialsForApps(); + + ConcurrentMap getSystemCredentialsForApps(); ConcurrentMap getInactiveRMNodes(); @@ -198,4 +198,8 @@ void setMultiNodeSortingManager( VolumeManager getVolumeManager(); void setVolumeManager(VolumeManager volumeManager); + + long getTokenSequenceNo(); + + void incrTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ab71134c93f..ba61d81ecb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -20,7 +20,6 @@ import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -35,6 +34,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -572,7 +572,7 @@ public void setSystemClock(Clock clock) { activeServiceContext.setSystemClock(clock); } - public ConcurrentMap getSystemCredentialsForApps() { + public ConcurrentMap getSystemCredentialsForApps() { return activeServiceContext.getSystemCredentialsForApps(); } @@ -666,4 +666,14 @@ public void setVolumeManager(VolumeManager volumeManager) { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSequenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..9b51030f057 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -618,11 +618,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -893,4 +889,29 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Token sequence no received from heartbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo() + + ". System credentials for apps size: " + + rmContext.getSystemCredentialsForApps().size()); + } + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + if (!systemCredentials.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Sending System credentials for apps as part of NodeHeartbeat response."); + } + nodeHeartBeatResponse + .setSystemCredentialsForApps(systemCredentials.values()); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd16bee..de8c30b77cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -64,10 +64,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -637,7 +639,7 @@ public Long run() throws Exception { // Request new hdfs token if the token is about to expire, and remove the old // token from the tokenToRenew list - private void requestNewHdfsDelegationTokenIfNeeded( + void requestNewHdfsDelegationTokenIfNeeded( final DelegationTokenToRenew dttr) throws IOException, InterruptedException { @@ -679,6 +681,7 @@ private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,14 +706,24 @@ private void requestNewHdfsDelegationTokenAsProxyUser( appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSequenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); for (ApplicationId applicationId : referringAppIds) { - rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId, + byteBuffer); + rmContext.getSystemCredentialsForApps().put(applicationId, + systemCredentialsForAppsProto); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..fe3a889af9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -68,6 +68,7 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); private Set nodeLabels; + private long tokenSequenceNo; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -278,6 +279,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); req.setRegisteringCollectors(this.registeringCollectors); + req.setTokenSequenceNo(this.tokenSequenceNo); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); @@ -302,6 +304,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, capability = Resources.clone(newResource); } + this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo(); return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a8719932f3f..21fb6901639 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -119,6 +119,7 @@ public static Resource newAvailResource(Resource total, Resource used) { private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; private Resource physicalResource; + private RMContext rmContext; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, @@ -141,6 +142,18 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.physicalResource = pPhysicalResource; } + public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, + Resource perNode, String rackName, String healthReport, + long lastHealthReportTime, int cmdPort, String hostName, + NodeState state, Set labels, + ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource pPhysicalResource, + RMContext rmContext) { + this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport, + lastHealthReportTime, cmdPort, hostName, state, labels, + containersUtilization, nodeUtilization, pPhysicalResource); + this.rmContext = rmContext; + } @Override public NodeId getNodeID() { return this.nodeId; @@ -298,7 +311,7 @@ public void setNodeAttributes(String prefix, @Override public RMContext getRMContext() { - return null; + return this.rmContext; } @Override @@ -343,6 +356,26 @@ private static RMNode buildRMNode(int rack, final Resource perNode, containersUtilization, nodeUtilization, physicalResource); } + private static RMNode buildRMNode(int rack, final Resource perNode, + NodeState state, String httpAddr, int hostnum, String hostName, int port, + Set labels, ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, Resource physicalResource, + RMContext rmContext) { + final String rackName = "rack" + rack; + final int nid = hostnum; + final String nodeAddr = hostName + ":" + nid; + if (hostName == null) { + hostName = "host" + nid; + } + final NodeId nodeID = NodeId.newInstance(hostName, port); + + final String httpAddress = httpAddr; + String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, + healthReport, 0, nid, hostName, state, labels, containersUtilization, + nodeUtilization, physicalResource, rmContext); + } + public static RMNode nodeInfo(int rack, final Resource perNode, NodeState state) { return buildRMNode(rack, perNode, state, "N/A"); @@ -371,4 +404,9 @@ public static RMNode newNodeInfo(int rack, final Resource perNode, return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port); } + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, int port, RMContext rmContext) { + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, + hostName, port, null, null, null, null, rmContext); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index b451db1a7d5..b01ad3b6d32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -19,10 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager; import org.apache.hadoop.net.ServerSocketUtil; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; + +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -33,6 +39,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -43,6 +50,8 @@ import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.xml.parsers.DocumentBuilderFactory; @@ -54,10 +63,13 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -78,9 +90,13 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.AttributeValue; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -100,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -107,10 +124,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.junit.After; @@ -2447,4 +2468,141 @@ protected ResourceTrackerService createResourceTrackerService() { response.getNodeAction()); mockRM.stop(); } + + @Test(timeout = 5000) + public void testSystemCredentialsAfterTokenSequenceNoChange() + throws Exception { + + Configuration conf = new Configuration(); + + RMContext rmContext = mock(RMContextImpl.class); + + Dispatcher dispatcher = new InlineDispatcher(); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ConcurrentMap rmNodes = + new ConcurrentHashMap(); + RMNode rmNode = MockNodes.newNodeInfo(1, Resource.newInstance(1024, 1), 1, + "localhost", 1234, rmContext); + rmNodes.put(nodeId, rmNode); + when(rmContext.getRMNodes()).thenReturn(rmNodes); + + ConcurrentMap inactiveNodes = + new ConcurrentHashMap(); + when(rmContext.getInactiveRMNodes()).thenReturn(inactiveNodes); + when(rmContext.getConfigurationProvider()) + .thenReturn(new LocalConfigurationProvider()); + + dispatcher.register(SchedulerEventType.class, + new InlineDispatcher.EmptyEventHandler()); + dispatcher.register(RMNodeEventType.class, + new NodeEventDispatcher(rmContext)); + + NMLivelinessMonitor nmLivelinessMonitor = + new NMLivelinessMonitor(dispatcher); + nmLivelinessMonitor.init(conf); + nmLivelinessMonitor.start(); + NodesListManager nodesListManager = new NodesListManager(rmContext); + nodesListManager.init(conf); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.start(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.start(); + ResourceTrackerService resourceTrackerService = new ResourceTrackerService( + rmContext, nodesListManager, nmLivelinessMonitor, + containerTokenSecretManager, nmTokenSecretManager); + + resourceTrackerService.init(conf); + resourceTrackerService.start(); + + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + RegisterNodeManagerRequest request = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + request.setNodeId(nodeId); + request.setHttpPort(1234); + request.setResource(BuilderUtils.newResource(1024, 1)); + resourceTrackerService.registerNodeManager(request); + + org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = + recordFactory.newRecordInstance( + org.apache.hadoop.yarn.server.api.records.NodeStatus.class); + nodeStatus.setNodeId(nodeId); + nodeStatus.setResponseId(0); + nodeStatus.setNodeHealthStatus( + recordFactory.newRecordInstance(NodeHealthStatus.class)); + nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true); + + NodeHeartbeatRequest request1 = + recordFactory.newRecordInstance(NodeHeartbeatRequest.class); + request1.setNodeStatus(nodeStatus); + + // Set NM's token sequence no as 1 + request1.setTokenSequenceNo(1); + + // Set RM's token sequence no as 1 + when(rmContext.getTokenSequenceNo()).thenReturn((long) 1); + + // Populate SystemCredentialsForApps + final ApplicationId appId = ApplicationId.newInstance(1234, 1); + Credentials app1Cred = new Credentials(); + + Token token = + new Token(); + token.setKind(new Text("kind1")); + app1Cred.addToken(new Text("token1"), token); + Token token2 = + new Token(); + token2.setKind(new Text("kind2")); + app1Cred.addToken(new Text("token2"), token2); + + DataOutputBuffer dob = new DataOutputBuffer(); + app1Cred.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(appId, + byteBuffer); + + ConcurrentHashMap systemCredentialsForApps = + new ConcurrentHashMap(1); + + systemCredentialsForApps.put(appId, systemCredentialsForAppsProto); + + when(rmContext.getSystemCredentialsForApps()) + .thenReturn(systemCredentialsForApps); + + // first ping + NodeHeartbeatResponse response = + resourceTrackerService.nodeHeartbeat(request1); + + // Though SystemCredentialsForApps size is 1, it is not being sent as part + // of response as there is no difference between NM's and RM's token + // sequence no + assertEquals(1, rmContext.getTokenSequenceNo()); + assertEquals(1, rmContext.getSystemCredentialsForApps().size()); + assertEquals(1, response.getTokenSequenceNo()); + assertEquals(0, response.getSystemCredentialsForApps().size()); + + // Set RM's token sequence no as 2 + when(rmContext.getTokenSequenceNo()).thenReturn((long) 2); + + // Ensure new heartbeat has been sent to avoid duplicate issues + nodeStatus.setResponseId(1); + request1.setNodeStatus(nodeStatus); + + // second ping + NodeHeartbeatResponse response1 = + resourceTrackerService.nodeHeartbeat(request1); + + // Since NM's and RM's token sequence no is different, response should + // contain SystemCredentialsForApps + assertEquals(2, response1.getTokenSequenceNo()); + assertEquals(1, response1.getSystemCredentialsForApps().size()); + + resourceTrackerService.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b327f1..9c06535480a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -35,6 +35,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.concurrent.BlockingQueue; @@ -80,7 +81,9 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -95,12 +98,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -146,8 +151,13 @@ public boolean isManaged(Token token) throws IOException { @Override public long renew(Token t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - // renew in 3 seconds - return System.currentTimeMillis() + 3000; + if(conf.get("override_token_expire_time") != null) { + return System.currentTimeMillis() + + Long.parseLong(conf.get("override_token_expire_time")); + } else { + // renew in 3 seconds + return System.currentTimeMillis() + 3000; + } } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -201,6 +211,7 @@ public void setUp() throws Exception { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); dispatcher = new AsyncDispatcher(eventQueue); @@ -209,7 +220,7 @@ public void setUp() throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); when(mockContext.getDelegationTokenRenewer()).thenReturn( delegationTokenRenewer); when(mockContext.getDispatcher()).thenReturn(dispatcher); @@ -581,7 +592,7 @@ public void testDTKeepAlive1 () throws Exception { createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -661,7 +672,7 @@ public void testDTKeepAlive2() throws Exception { createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -766,7 +777,7 @@ public void testDTRonAppSubmission() createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -825,7 +836,7 @@ public Long answer(InvocationOnMock invocation) createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -890,7 +901,7 @@ protected void doSecureLogin() throws IOException { } - @Test (timeout = 20000) + @Test(timeout = 20000) public void testReplaceExpiringDelegationToken() throws Exception { conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -969,8 +980,14 @@ public Boolean get() { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1062,8 +1079,14 @@ protected void renewToken(final DelegationTokenToRenew dttr) new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1117,8 +1140,14 @@ public Boolean get() { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1430,7 +1459,7 @@ public void testShutDown() { DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -1444,4 +1473,61 @@ public void testShutDown() { delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); } + + @Test(timeout = 10000) + public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + final Credentials credsx = new Credentials(); + + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier( + new Text("user1"), new Text("renewer"), new Text("user1")); + final Token expectedToken = + new Token(dtId1.getBytes(), + "password2".getBytes(), dtId1.getKind(), new Text("service2")); + + // fire up the renewer + final DelegationTokenRenewer dtr = new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token[] { expectedToken }; + } + }; + + RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + InetSocketAddress sockAddr = + InetSocketAddress.createUnresolved("localhost", 1234); + when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + dtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.init(conf); + dtr.start(); + + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + + Collection appIds = new ArrayList(1); + appIds.add(appId1); + + dtr.addApplicationSync(appId1, credsx, false, "user1"); + + // Ensure incrTokenSequenceNo has been called for new token request + Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo(); + + DelegationTokenToRenew dttr = new DelegationTokenToRenew(appIds, + expectedToken, conf, 1000, false, "user1"); + + dtr.requestNewHdfsDelegationTokenIfNeeded(dttr); + + // Ensure incrTokenSequenceNo has been called for token renewal as well. + Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); + } }