diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index f4ae9be..53a93c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -80,6 +80,7 @@ */ @Public @Stable + @AtMostOnce public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -104,6 +105,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( */ @Public @Stable + @AtMostOnce public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 1f5bd05..ae0c449 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.client; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,7 +130,7 @@ import org.junit.Before; -public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ +public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); @@ -759,6 +767,43 @@ public AllocateResponse allocate(AllocateRequest request) return createFakeAllocateResponse(); } + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeRegisterApplicationMasterResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return FinishApplicationMasterResponse.newInstance(true); + } + } + + public RegisterApplicationMasterResponse + createFakeRegisterApplicationMasterResponse() { + Resource minCapability = Resource.newInstance(2048, 2); + Resource maxCapability = Resource.newInstance(4096, 4); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, "*"); + ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes()); + return RegisterApplicationMasterResponse.newInstance(minCapability, + maxCapability, acls, key, new ArrayList(), "root", + new ArrayList()); + } + + public FinishApplicationMasterResponse + createFakeFinishApplicationMasterResponse() { + return FinishApplicationMasterResponse.newInstance(true); } public AllocateResponse createFakeAllocateResponse() { @@ -769,4 +814,5 @@ public AllocateResponse createFakeAllocateResponse() { null, new ArrayList()); } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java index 13020e8..fe957d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java @@ -30,8 +30,13 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -75,6 +80,39 @@ public void shutDown() { } @Test(timeout = 15000) + public void testRegisterApplicationMasterOnHA() throws YarnException, + IOException { + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); + RegisterApplicationMasterResponse response = + amClient.registerApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeRegisterApplicationMasterResponse()); + failoverThread = createAndStartFailoverThread(); + RegisterApplicationMasterResponse response2 = + amClient.registerApplicationMaster(request); + Assert.assertEquals(response2, + this.cluster.createFakeRegisterApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testFinishApplicationMasterOnHA() throws YarnException, + IOException { + FinishApplicationMasterRequest request = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + FinishApplicationMasterResponse response = + amClient.finishApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeFinishApplicationMasterResponse()); + failoverThread = createAndStartFailoverThread(); + FinishApplicationMasterResponse response2 = + amClient.finishApplicationMaster(request); + Assert.assertEquals(response2, + this.cluster.createFakeFinishApplicationMasterResponse()); + } + + @Test(timeout = 15000) public void testAllocateOnHA() throws YarnException, IOException { AllocateRequest request = AllocateRequest.newInstance(0, 50f, new ArrayList(),