diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4008a97f668..36758671ba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -578,6 +580,15 @@ public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) { TimedPlacementConstraintProto.DelayUnit u) { return TimedPlacementConstraint.DelayUnit.valueOf(u.name()); } + + public static ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + public static ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 4f9922507be..1b4339ba5d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -102,4 +102,8 @@ public abstract void setRegisteringCollectors(Map getNodeAttributes(); public abstract void setNodeAttributes(Set nodeAttributes); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..170d1cbca93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -118,4 +119,8 @@ public abstract void setContainerQueuingLimit( public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract void setTokenSequenceNo(long tokenSequenceNo); + + public abstract long getTokenSequenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SystemCredentialsForApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SystemCredentialsForApps.java index e69de29bb2d..00b63d49469 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SystemCredentialsForApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SystemCredentialsForApps.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public abstract class SystemCredentialsForApps { + + public abstract void setApplicationId(ApplicationId applicationID); + + public abstract void setCredentials(ByteBuffer credentials); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index c59127a74b3..7d96c8cad5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -455,4 +455,17 @@ public void setLogAggregationReportsForApps( } this.logAggregationReportsForApps = logAggregationStatusForApps; } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..7a47ddae2d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,10 +26,8 @@ import java.util.List; import java.util.Map; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.protobuf.ByteString; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; @@ -39,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; @@ -63,6 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; /** * PBImpl class for NodeHeartbeatResponse. @@ -88,9 +86,6 @@ private List containersToDecrease = null; private List containersToSignal = null; - private static final Interner BYTE_STRING_INTERNER = - Interners.newWeakInterner(); - public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -129,9 +124,6 @@ private void mergeLocalToBuilder() { builder.setContainerQueuingLimit( convertToProtoFormat(this.containerQueuingLimit)); } - if (this.systemCredentials != null) { - addSystemCredentialsToProto(); - } if (this.containersToUpdate != null) { addContainersToUpdateToProto(); } @@ -149,17 +141,6 @@ private void mergeLocalToBuilder() { } } - private void addSystemCredentialsToProto() { - maybeInitBuilder(); - builder.clearSystemCredentialsForApps(); - for (Map.Entry entry : systemCredentials.entrySet()) { - builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) - .setCredentialsForApp(BYTE_STRING_INTERNER.intern( - ProtoUtils.convertToProtoFormat(entry.getValue().duplicate())))); - } - } - private void addAppCollectorsMapToProto() { maybeInitBuilder(); builder.clearAppCollectors(); @@ -168,7 +149,7 @@ private void addAppCollectorsMapToProto() { AppCollectorData data = entry.getValue(); AppCollectorDataProto.Builder appCollectorDataBuilder = AppCollectorDataProto.newBuilder() - .setAppId(convertToProtoFormat(entry.getKey())) + .setAppId(ProtoUtils.convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion()); @@ -477,7 +458,7 @@ private void initApplicationsToCleanup() { this.applicationsToCleanup = new ArrayList(); for (ApplicationIdProto c : list) { - this.applicationsToCleanup.add(convertFromProtoFormat(c)); + this.applicationsToCleanup.add(ProtoUtils.convertFromProtoFormat(c)); } } @@ -510,7 +491,7 @@ public boolean hasNext() { @Override public ApplicationIdProto next() { - return convertToProtoFormat(iter.next()); + return ProtoUtils.convertToProtoFormat(iter.next()); } @Override @@ -667,7 +648,7 @@ private void initSystemCredentials() { List list = p.getSystemCredentialsForAppsList(); this.systemCredentials = new HashMap (); for (SystemCredentialsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp()); this.systemCredentials.put(appId, byteBuffer); } @@ -679,7 +660,7 @@ private void initAppCollectorsMap() { if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId()); Token collectorToken = null; if (c.hasAppCollectorToken()){ collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); @@ -699,8 +680,15 @@ public void setSystemCredentialsForApps( return; } maybeInitBuilder(); + this.builder.clearSystemCredentialsForApps(); this.systemCredentials = new HashMap(); - this.systemCredentials.putAll(systemCredentials); + for (Map.Entry entry : systemCredentials + .entrySet()) { + this.builder.addSystemCredentialsForApps(((SystemCredentialsForAppsPBImpl) YarnServerBuilderUtils + .newSystemCredentialsForApps(entry.getKey(), entry.getValue())) + .getProto()); + this.systemCredentials.put(entry.getKey(), entry.getValue()); + } } @Override @@ -742,14 +730,6 @@ private ResourceProto convertToProtoFormat(Resource t) { return ProtoUtils.convertToProtoFormat(t); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } @@ -874,5 +854,18 @@ private TokenProto convertToProtoFormat(Token t) { private TokenPBImpl convertFromProtoFormat(TokenProto p) { return new TokenPBImpl(p); } + + @Override + public void setTokenSequenceNo(long tokenSequenceNo) { + maybeInitBuilder(); + this.builder.setTokenSequenceNo(tokenSequenceNo); + } + + @Override + public long getTokenSequenceNo() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SystemCredentialsForAppsPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SystemCredentialsForAppsPBImpl.java index e69de29bb2d..155d1df55a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SystemCredentialsForAppsPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SystemCredentialsForAppsPBImpl.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.SystemCredentialsForApps; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.protobuf.ByteString; + +/** + * PBImpl class for SystemCredentialsForApps. + */ +public class SystemCredentialsForAppsPBImpl extends SystemCredentialsForApps { + + private SystemCredentialsForAppsProto proto = + SystemCredentialsForAppsProto.getDefaultInstance(); + private SystemCredentialsForAppsProto.Builder builder = null; + private boolean viaProto = false; + + private ApplicationId applicationId; + private ByteBuffer credentials; + + private static final Interner BYTE_STRING_INTERNER = + Interners.newWeakInterner(); + + public SystemCredentialsForAppsPBImpl() { + builder = SystemCredentialsForAppsProto.newBuilder(); + } + + public SystemCredentialsForAppsPBImpl(SystemCredentialsForAppsProto proto) { + this.proto = proto; + viaProto = true; + } + + public SystemCredentialsForAppsProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SystemCredentialsForAppsProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setAppId(ProtoUtils.convertToProtoFormat(this.applicationId)); + } + + if (this.credentials != null) { + builder.setCredentialsForApp(BYTE_STRING_INTERNER.intern( + ProtoUtils.convertToProtoFormat( + this.credentials.duplicate()))); + } + } + + @Override + public void setApplicationId(ApplicationId applicationID) { + this.applicationId = applicationID; + } + + @Override + public void setCredentials(ByteBuffer credentials) { + this.credentials = credentials; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index f3331855597..5593a95568f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.utils; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -25,6 +26,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SystemCredentialsForApps; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -66,4 +68,14 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, } return response; } + + public static SystemCredentialsForApps newSystemCredentialsForApps( + ApplicationId applicationID, ByteBuffer credentials) { + SystemCredentialsForApps systemCredentialsForApps = + recordFactory.newRecordInstance(SystemCredentialsForApps.class); + systemCredentialsForApps.setApplicationId(applicationID); + systemCredentialsForApps.setCredentials(credentials); + return systemCredentialsForApps; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..02a001249dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,7 @@ message NodeHeartbeatRequestProto { repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; optional NodeAttributesProto nodeAttributes = 7; + optional int64 tokenSequenceNo = 8; } message LogAggregationReportProto { @@ -128,6 +129,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional int64 tokenSequenceNo = 18; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 8b1d0bb49e5..41b071561b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,19 @@ import java.util.HashSet; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.security.token.delegation.DelegationKey; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; +//import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -153,10 +167,12 @@ public void testNodeHeartbeatRequestPBImplWithNullLabels() { /** * Test NodeHeartbeatResponsePBImpl. + * + * @throws IOException */ @Test - public void testNodeHeartbeatResponsePBImpl() { + public void testNodeHeartbeatResponsePBImpl() throws IOException { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); original.setDiagnosticsMessage("testDiagnosticMessage"); @@ -168,6 +184,44 @@ public void testNodeHeartbeatResponsePBImpl() { Map collectors = getCollectors(false); original.setAppCollectors(collectors); + // create token2 + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token expectedToken1 = + new Token(dtId1.getBytes(), + "password12".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials1 = new Credentials(); + credentials1.addToken(expectedToken1.getService(), expectedToken1); + + DataOutputBuffer dob1 = new DataOutputBuffer(); + credentials1.writeTokenStorageToStream(dob1); + + ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob1.getData(), 0, dob1.getLength()); + + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = new DelegationTokenIdentifier(userText2, + new Text("renewer2"), userText2); + final Token expectedToken2 = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + + Credentials credentials2 = new Credentials(); + credentials2.addToken(expectedToken2.getService(), expectedToken2); + + DataOutputBuffer dob2 = new DataOutputBuffer(); + credentials2.writeTokenStorageToStream(dob2); + ByteBuffer byteBuffer2 = + ByteBuffer.wrap(dob2.getData(), 0, dob2.getLength()); + + Map systemCredentials = + new HashMap(); + systemCredentials.put(getApplicationId(1), byteBuffer1); + systemCredentials.put(getApplicationId(2), byteBuffer2); + original.setSystemCredentialsForApps(systemCredentials); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); assertEquals(100, copy.getResponseId()); @@ -178,6 +232,36 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); + assertEquals(2, copy.getSystemCredentialsForApps().size()); + + Credentials credentials1Out = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + ByteBuffer buffer = + copy.getSystemCredentialsForApps().get(getApplicationId(1)); + Assert.assertNotNull(buffer); + buffer.rewind(); + buf.reset(buffer); + credentials1Out.readTokenStorageStream(buf); + assertEquals(1, credentials1Out.getAllTokens().size()); + // Ensure token1's password "password12" is available from proto response + assertEquals(10, + credentials1Out.getAllTokens().iterator().next().getPassword().length); + + NodeHeartbeatResponsePBImpl copy2 = + new NodeHeartbeatResponsePBImpl(original.getProto()); + + Credentials credentials1Out2 = new Credentials(); + DataInputByteBuffer buf2 = new DataInputByteBuffer(); + ByteBuffer buffer2 = + copy2.getSystemCredentialsForApps().get(getApplicationId(1)); + Assert.assertNotNull(buffer2); + buffer2.rewind(); + buf2.reset(buffer2); + credentials1Out2.readTokenStorageStream(buf2); + assertEquals(1, credentials1Out2.getAllTokens().size()); + // Ensure token1's password "password12" is available from proto response + assertEquals(10, + credentials1Out2.getAllTokens().iterator().next().getPassword().length); } @Test @@ -376,7 +460,8 @@ public void testUnRegisterNodeManagerRequestPBImpl() throws Exception { AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); if (!hasNullCollectorToken) { data.setCollectorToken( - Token.newInstance(new byte[0], "kind", new byte[0], "s")); + org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0], + "kind", new byte[0], "s")); } Map collectorMap = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..c069baf2ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -157,6 +157,7 @@ private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; private NodeAttributesProvider nodeAttributesProvider; + private long tokenSequenceNo; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -1145,6 +1146,8 @@ public void run() { } } + request.setTokenSequenceNo( + NodeStatusUpdaterImpl.this.tokenSequenceNo); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -1227,6 +1230,8 @@ public void run() { updateTimelineCollectorData(response); } + NodeStatusUpdaterImpl.this.tokenSequenceNo = + response.getTokenSequenceNo(); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f829a4cbbc4..d592eafff9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -124,6 +125,8 @@ private ProxyCAManager proxyCAManager; private VolumeManager volumeManager; + private AtomicLong tokenSequenceNo = new AtomicLong(1); + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -583,4 +586,21 @@ public VolumeManager getVolumeManager() { public void setVolumeManager(VolumeManager volumeManager) { this.volumeManager = volumeManager; } + + /** + * Get token sequence no. + * + * @return the tokenSequenceNo + */ + public Long getTokenSequenceNo() { + return tokenSequenceNo.get(); + } + + /** + * Increment token sequence no. + * + */ + public void incrTokenSequenceNo() { + this.tokenSequenceNo.incrementAndGet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 4e9846c731b..9adc9c857a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -198,4 +198,8 @@ void setMultiNodeSortingManager( VolumeManager getVolumeManager(); void setVolumeManager(VolumeManager volumeManager); + + long getTokenSequenceNo(); + + void incrTokenSeqeuenceNo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ab71134c93f..62467b528d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -666,4 +666,14 @@ public void setVolumeManager(VolumeManager volumeManager) { public NodeAttributesManager getNodeAttributesManager() { return activeServiceContext.getNodeAttributesManager(); } + + @Override + public long getTokenSequenceNo() { + return this.activeServiceContext.getTokenSequenceNo(); + } + + @Override + public void incrTokenSeqeuenceNo() { + this.activeServiceContext.incrTokenSequenceNo(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..06cb6bc50d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -618,11 +618,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); - ConcurrentMap systemCredentials = - rmContext.getSystemCredentialsForApps(); - if (!systemCredentials.isEmpty()) { - nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); - } + populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineV2Enabled) { // Return collectors' map that NM needs to know @@ -893,4 +889,22 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private void populateTokenSequenceNo(NodeHeartbeatRequest request, + NodeHeartbeatResponse nodeHeartBeatResponse) { + LOG.debug("Token sequence no received from hearbeat request: " + + request.getTokenSequenceNo() + ". Current token sequeunce no: " + + this.rmContext.getTokenSequenceNo()); + if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) { + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + LOG.debug("System credentials for apps size: " + + systemCredentials.size()); + if (!systemCredentials.isEmpty()) { + nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); + } + } + nodeHeartBeatResponse.setTokenSequenceNo( + this.rmContext.getTokenSequenceNo()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a9f8cd16bee..1f8e6250309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -679,6 +679,7 @@ private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + boolean incrTokenSequenceNo = false; if (!hasProxyUserPrivileges) { LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); return; @@ -703,9 +704,15 @@ private void requestNewHdfsDelegationTokenAsProxyUser( appTokens.get(applicationId).add(tokenToRenew); } LOG.info("Received new token " + token); + incrTokenSequenceNo = true; } } } + + if(incrTokenSequenceNo) { + this.rmContext.incrTokenSeqeuenceNo(); + } + DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());