diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index fee72e8..c2d6483 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -79,6 +81,7 @@ */ @Public @Stable + @Idempotent public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -103,6 +106,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( */ @Public @Stable + @Idempotent public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; @@ -162,6 +166,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( */ @Public @Stable + @AtMostOnce public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 4f3cab2..641986a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -24,8 +24,11 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -33,8 +36,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; @@ -63,18 +70,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -82,6 +96,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -96,6 +111,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; @@ -257,11 +273,13 @@ public void run() { } protected void startHACluster(int numOfNMs, boolean overrideClientRMService, - boolean overrideRTS) throws Exception { + boolean overrideRTS, boolean overrideApplicationMasterService) + throws Exception { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster = new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2, - numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS); + numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS, + overrideApplicationMasterService); cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); @@ -285,17 +303,19 @@ protected ResourceManager getActiveRM() { private boolean overrideClientRMService; private boolean overrideRTS; + private boolean overrideApplicationMasterService; private final AtomicBoolean startFailover = new AtomicBoolean(false); private final AtomicBoolean failoverTriggered = new AtomicBoolean(false); public MiniYARNClusterForHATesting(String testName, int numResourceManagers, int numNodeManagers, int numLocalDirs, int numLogDirs, boolean enableAHS, boolean overrideClientRMService, - boolean overrideRTS) { + boolean overrideRTS, boolean overrideApplicationMasterService) { super(testName, numResourceManagers, numNodeManagers, numLocalDirs, numLogDirs, enableAHS); this.overrideClientRMService = overrideClientRMService; this.overrideRTS = overrideRTS; + this.overrideApplicationMasterService = overrideApplicationMasterService; } public boolean getStartFailoverFlag() { @@ -324,6 +344,11 @@ private boolean waittingForFailOver() { if (count >= maximumWaittingTime) { return false; } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // DO NOTHING + } return true; } @@ -354,6 +379,14 @@ protected ResourceTrackerService createResourceTrackerService() { } return super.createResourceTrackerService(); } + @Override + protected ApplicationMasterService createApplicationMasterService() { + if (overrideApplicationMasterService) { + return new CustomedApplicationMasterService(this.rmContext, + this.scheduler); + } + return super.createApplicationMasterService(); + } }; } @@ -717,5 +750,63 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return super.nodeHeartbeat(request); } } + + private class CustomedApplicationMasterService extends + ApplicationMasterService { + public CustomedApplicationMasterService(RMContext rmContext, + YarnScheduler scheduler) { + super(rmContext, scheduler); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeAllocateResponse(); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeRegisterApplicationMasterResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return FinishApplicationMasterResponse.newInstance(true); + } + } + + public RegisterApplicationMasterResponse + createFakeRegisterApplicationMasterResponse() { + Resource minCapability = Resource.newInstance(2048, 2); + Resource maxCapability = Resource.newInstance(4096, 4); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, "*"); + ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes()); + return RegisterApplicationMasterResponse.newInstance(minCapability, + maxCapability, acls, key, new ArrayList(), "root", + new ArrayList()); + } + + public AllocateResponse createFakeAllocateResponse() { + return AllocateResponse.newInstance(-1, + new ArrayList(), + new ArrayList(), new ArrayList(), + Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, + null, new ArrayList()); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java index 2aa6cc6..bfc6656 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java @@ -51,7 +51,7 @@ @Before public void initiate() throws Exception { - startHACluster(1, true, false); + startHACluster(1, true, false, false); Configuration conf = new YarnConfiguration(this.conf); client = createAndStartYarnClient(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java new file mode 100644 index 0000000..58827fb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java @@ -0,0 +1,123 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import junit.framework.Assert; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId ; + RMAppAttempt appAttempt; + + @Before + public void initiate() throws Exception { + startHACluster(0, false, false, true); + attemptId = this.cluster.createFakeApplicationAttemptId(); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + + AMRMTokenIdentifier id = + new AMRMTokenIdentifier(attemptId); + Token appToken = + new Token(id, this.cluster.getResourceManager() + .getRMContext().getAMRMTokenSecretManager()); + appToken.setService(new Text("appToken service")); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser() + .getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + @Test(timeout = 15000) + public void testAllocateOnHA() throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = amClient.allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + } + + @Test(timeout = 15000) + public void testRegisterApplicationMasterOnHA() throws YarnException, + IOException { + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); + RegisterApplicationMasterResponse response = + amClient.registerApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeRegisterApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testFinishApplicationMasterOnHA() throws YarnException, + IOException { + FinishApplicationMasterRequest request = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + amClient.finishApplicationMaster(request); + } + + private void syncToken(Token token) { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + ResourceManager rm = this.cluster.getResourceManager(i); + if (rm.getRMContext().getAMRMTokenSecretManager().getPasswords() + .get(attemptId) == null) { + rm.getRMContext().getAMRMTokenSecretManager().getPasswords() + .put(attemptId, token.getPassword()); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index f2d8bc2..498dbe3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -41,7 +41,7 @@ @Before public void initiate() throws Exception { - startHACluster(0, false, true); + startHACluster(0, false, true, false); this.resourceTracker = getRMClient(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 5d21ec0..319af82 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import com.google.common.annotations.VisibleForTesting; + /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their * tokens, it is their headache, god save them. I mean you are not supposed to @@ -167,4 +169,8 @@ public AMRMTokenIdentifier createIdentifier() { return new AMRMTokenIdentifier(); } + @VisibleForTesting + public Map getPasswords() { + return this.passwords; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 8632815..f641664 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -739,4 +739,8 @@ protected void doSecureLogin() throws IOException { } }; } + + public int getNumOfResourceManager() { + return this.resourceManagers.length; + } }