diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4008a97f668..f175cf3e6cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -92,12 +94,16 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; import org.apache.hadoop.yarn.server.api.ContainerType; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.google.protobuf.ByteString; @Private @Unstable public class ProtoUtils { + public static final Interner BYTE_STRING_INTERNER = + Interners.newWeakInterner(); /* * ContainerState @@ -578,6 +584,18 @@ public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) { TimedPlacementConstraintProto.DelayUnit u) { return TimedPlacementConstraint.DelayUnit.valueOf(u.name()); } + + /* + * ApplicationId + */ + public static ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + public static ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f9922507be..1b4339ba5d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract void setRegisteringCollectors(Map getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..2d0c9259a3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -91,9 +92,6 @@ public abstract void addAllContainersToSignal( // Credentials (i.e. hdfs tokens) needed by NodeManagers for application // localizations and logAggreations. public abstract Map getSystemCredentialsForApps(); - - public abstract void setSystemCredentialsForApps( - Map systemCredentials); public abstract boolean getAreNodeLabelsAcceptedByRM(); @@ -118,4 +116,11 @@ public abstract void setContainerQueuingLimit( public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); + + public abstract void setSystemCredentialsForApps( + Map systemCredentials); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a74b3..7d96c8cad5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,17 @@ public void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..5e3929ae0a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,10 +26,6 @@ import java.util.List; import java.util.Map; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.protobuf.ByteString; - import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; @@ -39,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; @@ -77,6 +72,8 @@ private List containersToBeRemovedFromNM = null; private List applicationsToCleanup = null; private Map systemCredentials = null; + private Map systemCredentialsForAppsProto = + null; private Resource resource = null; private Map appCollectorsMap = null; @@ -88,9 +85,6 @@ private List containersToDecrease = null; private List containersToSignal = null; - private static final Interner BYTE_STRING_INTERNER = - Interners.newWeakInterner(); - public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -129,7 +123,7 @@ private void mergeLocalToBuilder() { builder.setContainerQueuingLimit( convertToProtoFormat(this.containerQueuingLimit)); } - if (this.systemCredentials != null) { + if (this.systemCredentialsForAppsProto != null) { addSystemCredentialsToProto(); } if (this.containersToUpdate != null) { @@ -152,11 +146,9 @@ private void mergeLocalToBuilder() { private void addSystemCredentialsToProto() { maybeInitBuilder(); builder.clearSystemCredentialsForApps(); - for (Map.Entry entry : systemCredentials.entrySet()) { - builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(BYTE_STRING_INTERNER.intern( - ProtoUtils.convertToProtoFormat(entry.getValue().duplicate())))); + for (Map.Entry entry : systemCredentialsForAppsProto + .entrySet()) { + builder.addSystemCredentialsForApps(entry.getValue()); } } @@ -168,7 +160,7 @@ private void addAppCollectorsMapToProto() { AppCollectorData data = entry.getValue(); AppCollectorDataProto.Builder appCollectorDataBuilder = AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) + .setAppId(ProtoUtils.convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion()); @@ -477,7 +469,7 @@ private void initApplicationsToCleanup() { this.applicationsToCleanup = new ArrayList(); for (ApplicationIdProto c : list) { - this.applicationsToCleanup.add(convertFromProtoFormat(c)); + this.applicationsToCleanup.add(ProtoUtils.convertFromProtoFormat(c)); } } @@ -510,7 +502,7 @@ public boolean hasNext() { @Override public ApplicationIdProto next() { - return convertToProtoFormat(iter.next()); + return ProtoUtils.convertToProtoFormat(iter.next()); } @Override @@ -665,9 +657,10 @@ public void remove() { private void initSystemCredentials() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getSystemCredentialsForAppsList(); - this.systemCredentials = new HashMap (); + this.systemCredentials = + new HashMap(); for (SystemCredentialsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp()); this.systemCredentials.put(appId, byteBuffer); } @@ -679,7 +672,7 @@ private void initAppCollectorsMap() { if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); Token collectorToken = null; if (c.hasAppCollectorToken()){ collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); @@ -694,13 +687,15 @@ private void initAppCollectorsMap() { @Override public void setSystemCredentialsForApps( - Map systemCredentials) { - if (systemCredentials == null || systemCredentials.isEmpty()) { + Map systemCredentialsForAppsProto) { + if (systemCredentialsForAppsProto == null + || systemCredentialsForAppsProto.isEmpty()) { return; } maybeInitBuilder(); - this.systemCredentials = new HashMap(); - this.systemCredentials.putAll(systemCredentials); + this.systemCredentialsForAppsProto = + new HashMap(); + this.systemCredentialsForAppsProto.putAll(systemCredentialsForAppsProto); } @Override @@ -742,14 +737,6 @@ private ResourceProto convertToProtoFormat(Resource t) { return ProtoUtils.convertToProtoFormat(t); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } @@ -874,5 +861,18 @@ private TokenProto convertToProtoFormat(Token t) { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index f3331855597..fb77affbff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.utils; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -66,4 +69,22 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, } return response; } + + /** + * Build SystemCredentialsForAppsProto objects + * + * @param applicationId Application ID + * @param credentials HDFS Tokens + * @return systemCredentialsForAppsProto SystemCredentialsForAppsProto + */ + public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto( + ApplicationId applicationId, ByteBuffer credentials) { + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + SystemCredentialsForAppsProto.newBuilder() + .setAppId(ProtoUtils.convertToProtoFormat(applicationId)) + .setCredentialsForApp(ProtoUtils.BYTE_STRING_INTERNER.intern( + ProtoUtils.convertToProtoFormat(credentials.duplicate()))) + .build(); + return systemCredentialsForAppsProto; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..02a001249dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -128,6 +129,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional int64 tokenSequenceNo = 18; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 8b1d0bb49e5..c9612dd7ab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,12 @@ import java.util.HashSet; import java.util.Map; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,12 +44,13 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; @@ -153,10 +161,12 @@ public void testNodeHeartbeatRequestPBImplWithNullLabels() { /** * Test NodeHeartbeatResponsePBImpl. + * + * @throws IOException */ @Test - public void testNodeHeartbeatResponsePBImpl() { + public void testNodeHeartbeatResponsePBImpl() throws IOException { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); original.setDiagnosticsMessage("testDiagnosticMessage"); @@ -168,6 +178,34 @@ public void testNodeHeartbeatResponsePBImpl() { Map collectors = getCollectors(false); original.setAppCollectors(collectors); + // create token1 + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token expectedToken1 = + new Token(dtId1.getBytes(), + "password12".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials1 = new Credentials(); + credentials1.addToken(expectedToken1.getService(), expectedToken1); + + DataOutputBuffer dob1 = new DataOutputBuffer(); + credentials1.writeTokenStorageToStream(dob1); + + ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob1.getData(), 0, dob1.getLength()); + + SystemCredentialsForAppsProto systemCredentialsForAppsProto1 = + SystemCredentialsForAppsProto.newBuilder() + .setAppId(ProtoUtils.convertToProtoFormat(getApplicationId(1))) + .setCredentialsForApp( + ProtoUtils.convertToProtoFormat(byteBuffer1.duplicate())) + .build(); + + Map systemCredentials = + new HashMap(); + systemCredentials.put(getApplicationId(1), systemCredentialsForAppsProto1); + original.setSystemCredentialsForApps(systemCredentials); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); assertEquals(100, copy.getResponseId()); @@ -178,6 +216,20 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); + assertEquals(1, copy.getSystemCredentialsForApps().size()); + + Credentials credentials1Out = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + ByteBuffer buffer = + copy.getSystemCredentialsForApps().get(getApplicationId(1)); + Assert.assertNotNull(buffer); + buffer.rewind(); + buf.reset(buffer); + credentials1Out.readTokenStorageStream(buf); + assertEquals(1, credentials1Out.getAllTokens().size()); + // Ensure token1's password "password12" is available from proto response + assertEquals(10, + credentials1Out.getAllTokens().iterator().next().getPassword().length); } @Test @@ -376,7 +428,8 @@ public void testUnRegisterNodeManagerRequestPBImpl() throws Exception { AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); if (!hasNullCollectorToken) { data.setCollectorToken( - Token.newInstance(new byte[0], "kind", new byte[0], "s")); + org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0], + "kind", new byte[0], "s")); } Map collectorMap = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index e6e79d3f5dc..2f0d25a67aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -139,8 +141,8 @@ public void testRegisterNodeManagerRequest() { public void testNodeHeartBeatResponse() throws IOException { NodeHeartbeatResponse record = Records.newRecord(NodeHeartbeatResponse.class); - Map appCredentials = - new HashMap(); + Map appCredentials = + new HashMap(); Credentials app1Cred = new Credentials(); Token token1 = @@ -154,14 +156,24 @@ public void testNodeHeartBeatResponse() throws IOException { DataOutputBuffer dob = new DataOutputBuffer(); app1Cred.writeTokenStorageToStream(dob); - ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto( + ApplicationId.newInstance(1234, 1), byteBuffer); + + appCredentials.put(ApplicationId.newInstance(1234, 1), + systemCredentialsForAppsProto); record.setSystemCredentialsForApps(appCredentials); + Map appCredentialsOut = + new HashMap(); + appCredentialsOut.put(ApplicationId.newInstance(1234, 1), byteBuffer); + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( ((NodeHeartbeatResponsePBImpl) record).getProto()); - Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); + Assert.assertEquals(appCredentialsOut, proto.getSystemCredentialsForApps()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..c069baf2ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -157,6 +157,7 @@ private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -1145,6 +1146,8 @@ public void run() { } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1227,6 +1230,8 @@ public void run() { updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 8435340164a..f94b5f0ed2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.EOFException; @@ -79,7 +78,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; +import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -112,6 +114,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater extends NodeManagerTestBase { @@ -325,16 +329,29 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(context, dispatcher, healthChecker, metrics); - resourceTracker = new MyResourceTracker4(context); + ResourceTracker resourceTracker = new MyResourceTracker4(context); + InetSocketAddress address = new InetSocketAddress(0); + Configuration configuration = new Configuration(); + Server server = RpcServerFactoryPBImpl.get().getServer( + ResourceTracker.class, resourceTracker, address, configuration, null, + 1); + server.start(); + this.resourceTracker = (ResourceTracker) RpcClientFactoryPBImpl.get() + .getClient( + ResourceTracker.class, 1, NetUtils.getConnectAddress(server), + configuration); } @Override - protected ResourceTracker getRMClient() { + protected ResourceTracker getRMClient() throws IOException { return resourceTracker; } @Override protected void stopRMProxy() { + if (this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } return; } } @@ -773,13 +790,17 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); - Map appCredentials = - new HashMap(); + Map appCredentials = + new HashMap(); DataOutputBuffer dob = new DataOutputBuffer(); expectedCredentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto( + ApplicationId.newInstance(1234, 1), byteBuffer1); + appCredentials.put(ApplicationId.newInstance(1234, 1), + systemCredentialsForAppsProto); nhResponse.setSystemCredentialsForApps(appCredentials); return nhResponse; } @@ -1702,10 +1723,14 @@ protected ContainerManagerImpl createContainerManager(Context context, @Test public void testConcurrentAccessToSystemCredentials(){ - final Map testCredentials = new HashMap<>(); + final Map testCredentials = + new HashMap<>(); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); ApplicationId applicationId = ApplicationId.newInstance(123456, 120); - testCredentials.put(applicationId, byteBuffer); + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId, + byteBuffer); + testCredentials.put(applicationId, systemCredentialsForAppsProto); final List exceptions = Collections.synchronizedList(new ArrayList()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f829a4cbbc4..bf66034db07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -81,8 +82,8 @@ private final ConcurrentMap inactiveNodes = new ConcurrentHashMap(); - private final ConcurrentMap systemCredentials = - new ConcurrentHashMap(); + private final ConcurrentMap systemCredentials = + new ConcurrentHashMap(); private boolean isWorkPreservingRecoveryEnabled; @@ -124,6 +125,8 @@ private ProxyCAManager proxyCAManager; private VolumeManager volumeManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -509,7 +512,7 @@ public void setSystemClock(Clock clock) { @Private @Unstable - public ConcurrentMap getSystemCredentialsForApps() { + public ConcurrentMap getSystemCredentialsForApps() { return systemCredentials; } @@ -583,4 +586,21 @@ public VolumeManager getVolumeManager() { public void setVolumeManager(VolumeManager volumeManager) { this.volumeManager = volumeManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 4e9846c731b..6c28eb91437 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -29,6 +28,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -72,8 +72,8 @@ RMStateStore getStateStore(); ConcurrentMap getRMApps(); - - ConcurrentMap getSystemCredentialsForApps(); + + ConcurrentMap getSystemCredentialsForApps(); ConcurrentMap getInactiveRMNodes(); @@ -198,4 +198,8 @@ void setMultiNodeSortingManager( VolumeManager getVolumeManager(); void setVolumeManager(VolumeManager volumeManager); + + long getTokenSequenceNo(); + + void incrTokenSeqeuenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ab71134c93f..61f9c8d7cf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -20,7 +20,6 @@ import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -35,6 +34,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -572,7 +572,7 @@ public void setSystemClock(Clock clock) { activeServiceContext.setSystemClock(clock); } - public ConcurrentMap getSystemCredentialsForApps() { + public ConcurrentMap getSystemCredentialsForApps() { return activeServiceContext.getSystemCredentialsForApps(); } @@ -666,4 +666,14 @@ public void setVolumeManager(VolumeManager volumeManager) { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSeqeuenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..ab3ad723bd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -618,11 +618,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -893,4 +889,26 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Token sequence no received from hearbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo()); + } + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "System credentials for apps size: " + systemCredentials.size()); + } + if (!systemCredentials.isEmpty()) { + nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd16bee..5569d92aaf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -64,10 +64,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -679,6 +681,7 @@ private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,14 +706,24 @@ private void requestNewHdfsDelegationTokenAsProxyUser( appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSeqeuenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); for (ApplicationId applicationId : referringAppIds) { - rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + SystemCredentialsForAppsProto systemCredentialsForAppsProto = + YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId, + byteBuffer); + rmContext.getSystemCredentialsForApps().put(applicationId, + systemCredentialsForAppsProto); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2839552ee..fe3a889af9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -68,6 +68,7 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); private Set nodeLabels; + private long tokenSequenceNo; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -278,6 +279,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); req.setRegisteringCollectors(this.registeringCollectors); + req.setTokenSequenceNo(this.tokenSequenceNo); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); @@ -302,6 +304,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, capability = Resources.clone(newResource); } + this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo(); return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b327f1..1c34c6720a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -80,7 +80,9 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -146,8 +148,13 @@ public boolean isManaged(Token token) throws IOException { @Override public long renew(Token t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - // renew in 3 seconds - return System.currentTimeMillis() + 3000; + if(conf.get("override_token_expire_time") != null) { + return System.currentTimeMillis() + + Long.parseLong(conf.get("override_token_expire_time")); + } else { + // renew in 3 seconds + return System.currentTimeMillis() + 3000; + } } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -201,6 +208,7 @@ public void setUp() throws Exception { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); dispatcher = new AsyncDispatcher(eventQueue); @@ -209,7 +217,7 @@ public void setUp() throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); when(mockContext.getDelegationTokenRenewer()).thenReturn( delegationTokenRenewer); when(mockContext.getDispatcher()).thenReturn(dispatcher); @@ -581,7 +589,7 @@ public void testDTKeepAlive1 () throws Exception { createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -661,7 +669,7 @@ public void testDTKeepAlive2() throws Exception { createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -766,7 +774,7 @@ public void testDTRonAppSubmission() createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -825,7 +833,7 @@ public Long answer(InvocationOnMock invocation) createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -890,7 +898,7 @@ protected void doSecureLogin() throws IOException { } - @Test (timeout = 20000) + @Test(timeout = 20000) public void testReplaceExpiringDelegationToken() throws Exception { conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -969,8 +977,12 @@ public Boolean get() { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + proto.getSystemCredentialsForApps().get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1062,8 +1074,12 @@ protected void renewToken(final DelegationTokenToRenew dttr) new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + proto.getSystemCredentialsForApps().get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1117,8 +1133,12 @@ public Boolean get() { new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + ByteBuffer tokenBuffer = - response.getSystemCredentialsForApps().get(app.getApplicationId()); + proto.getSystemCredentialsForApps().get(app.getApplicationId()); Assert.assertNotNull(tokenBuffer); Credentials appCredentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); @@ -1430,7 +1450,7 @@ public void testShutDown() { DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( - new ConcurrentHashMap()); + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -1444,4 +1464,127 @@ public void testShutDown() { delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); } + + @Test(timeout = 20000) + public void testTokenSequencNo() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + + // set max date to 0 to simulate an expiring token; + dtId1.setMaxDate(0); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token expectedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + + // Set token expire time to 60s. + conf.set("override_token_expire_time", "5000"); + expectedToken.renew(conf); + + final MockRM rm = new TestSecurityMockRM(conf, null) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token[] {expectedToken}; + } + }; + } + }; + rm.start(); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + assertEquals(0, proto.getSystemCredentialsForApps().size()); + + RMApp app = rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // wait for the initial expiring hdfs token to be removed from allTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return + rm.getRMContext().getDelegationTokenRenewer().getAllTokens() + .get(token1) == null; + } + }, 2000, 7000); + + // wait for the initial expiring hdfs token to be removed from appTokens + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return !rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(token1); + } + }, 2000, 7000); + + // wait for the new retrieved hdfs token. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(expectedToken); + } + }, 2000, 7000); + + NodeHeartbeatResponse response4 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse proto4 = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response4).getProto()); + assertEquals(1, proto4.getSystemCredentialsForApps().size()); + + ByteBuffer tokenBuffer = + response4.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); + + // wait for the hdfs tokens expiration so that renewal happens and + // creds would get passed as part of heartbeat response + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + NodeHeartbeatResponse response7 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse proto7 = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response7).getProto()); + if (proto7.getSystemCredentialsForApps().size() == 1) { + assertEquals(1, proto7.getSystemCredentialsForApps().size()); + return true; + } else { + return false; + } + } catch (Exception e) { + return false; + } + } + }, 2000, 7000); + } }