diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index f4ae9be..53a93c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -80,6 +80,7 @@ */ @Public @Stable + @AtMostOnce public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -104,6 +105,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( */ @Public @Stable + @AtMostOnce public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 034ec4f..fa6fbec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -524,6 +524,20 @@ "NONE"; //////////////////////////////// + // RetryCache Configs + //////////////////////////////// + public static final String RM_RETRY_CACHE_ENABLED = RM_PREFIX + + "retry-cache.enabled"; + public static final boolean DEFAULT_RM_RETRY_CACHE_ENABLED = false; + public static final String RM_RETRY_CACHE_EXPIRY_MS = + RM_PREFIX + ".retry-cache" + ".expiry-ms"; + // 10 minutes + public static final long DEFAULT_RM_RETRY_CACHE_EXPIRY_MS = 600000; + public static final String RM_RETRY_CACHE_HEAP_PERCENT = + RM_PREFIX + "retry-cache.heap.percent"; + public static final float DEFAULT_RM_RETRY_CACHE_HEAP_PERCENT = 0.03f; + + //////////////////////////////// // Node Manager Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 72cb1b1..5318dd5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.client; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,7 +130,23 @@ import org.junit.Before; -public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ +/** + * Test Base for ResourceManager's Protocol on HA. + * + * Limited scope: + * For all the test cases, we only test whether the method will be re-entry + * when failover happens. Does not cover the entire logic test. + * + * Test strategy: + * Create a separate failover thread with a trigger flag, + * override all APIs that added trigger flag. + * When the APIs are called, we will set trigger flag as true to kick off + * the failover. So We can make sure the failover happens during process + * of the method. If this API is marked as @Idempotent or @AtMostOnce, + * the test cases will pass; otherwise, they will throw the exception. + * + */ +public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); @@ -760,6 +784,43 @@ public AllocateResponse allocate(AllocateRequest request) return createFakeAllocateResponse(); } + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeRegisterApplicationMasterResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeFinishApplicationMasterResponse(); + } + } + + public RegisterApplicationMasterResponse + createFakeRegisterApplicationMasterResponse() { + Resource minCapability = Resource.newInstance(2048, 2); + Resource maxCapability = Resource.newInstance(4096, 4); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, "*"); + ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes()); + return RegisterApplicationMasterResponse.newInstance(minCapability, + maxCapability, acls, key, new ArrayList(), "root", + new ArrayList()); + } + + public FinishApplicationMasterResponse + createFakeFinishApplicationMasterResponse() { + return FinishApplicationMasterResponse.newInstance(true); } public AllocateResponse createFakeAllocateResponse() { @@ -770,4 +831,5 @@ public AllocateResponse createFakeAllocateResponse() { null, new ArrayList()); } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java deleted file mode 100644 index 0b42ac3..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ /dev/null @@ -1,92 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.client; - -import java.io.IOException; -import java.util.ArrayList; - -import org.junit.Assert; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - RMAppAttempt appAttempt; - - @Before - public void initiate() throws Exception { - startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - - Token appToken = - this.cluster.getResourceManager().getRMContext() - .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(new Text("appToken service")); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser() - .getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } - } - - @Test(timeout = 15000) - public void testAllocateOnHA() throws YarnException, IOException { - AllocateRequest request = AllocateRequest.newInstance(0, 50f, - new ArrayList(), - new ArrayList(), - ResourceBlacklistRequest.newInstance(new ArrayList(), - new ArrayList())); - AllocateResponse response = amClient.allocate(request); - Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); - } - - private void syncToken(Token token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java new file mode 100644 index 0000000..81015e3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java @@ -0,0 +1,119 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import org.junit.Assert; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestApplicationMasterServiceProtocolOnHA + extends ProtocolHATestBase { + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId ; + + @Before + public void initiate() throws Exception { + startHACluster(0, false, false, true); + attemptId = this.cluster.createFakeApplicationAttemptId(); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + + Token appToken = + this.cluster.getResourceManager().getRMContext() + .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); + appToken.setService(new Text("appToken service")); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser() + .getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + @Test(timeout = 15000) + public void testRegisterApplicationMasterOnHA() throws YarnException, + IOException { + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); + RegisterApplicationMasterResponse response = + amClient.registerApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeRegisterApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testFinishApplicationMasterOnHA() throws YarnException, + IOException { + FinishApplicationMasterRequest request = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + FinishApplicationMasterResponse response = + amClient.finishApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeFinishApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testAllocateOnHA() throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = amClient.allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + } + + private void syncToken(Token token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index d77180c..eabe9d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RetryCache; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; @@ -114,6 +115,7 @@ private final AllocateResponse shutdown = recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; + private RetryCache retryCache; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); @@ -124,6 +126,11 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { this.rmContext = rmContext; } + protected void serviceInit(Configuration conf) throws Exception { + this.retryCache = initRetryCache(conf); + super.serviceInit(conf); + } + @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); @@ -237,6 +244,26 @@ public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { + RetryCache.CacheEntryWithPayload cacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + // Return previous response + return (RegisterApplicationMasterResponse) cacheEntry.getPayload(); + } + + RegisterApplicationMasterResponse response = null; + try { + response = processRegisterApplicationMaster(request); + } finally { + RetryCache.setState(cacheEntry, response != null, response); + } + return response; + } + + public RegisterApplicationMasterResponse processRegisterApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); ApplicationAttemptId applicationAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); @@ -338,6 +365,26 @@ public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { + RetryCache.CacheEntryWithPayload cacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + // Return previous response + return (FinishApplicationMasterResponse) cacheEntry.getPayload(); + } + + FinishApplicationMasterResponse response = null; + try { + response = processFinishApplicationMaster(request); + } finally { + RetryCache.setState(cacheEntry, response != null, response); + } + return response; + } + + public FinishApplicationMasterResponse processFinishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + ApplicationAttemptId applicationAttemptId = authorizeRequest().getApplicationAttemptId(); @@ -414,6 +461,25 @@ public boolean hasApplicationMasterRegistered( public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + RetryCache.CacheEntryWithPayload cacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + // Return previous response + return (AllocateResponse) cacheEntry.getPayload(); + } + + AllocateResponse response = null; + try { + response = processAllocate(request); + } finally { + RetryCache.setState(cacheEntry, response != null, response); + } + return response; + } + + private AllocateResponse processAllocate(AllocateRequest request) + throws YarnException { + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); ApplicationAttemptId appAttemptId = @@ -673,6 +739,7 @@ protected void serviceStop() throws Exception { if (this.server != null) { this.server.stop(); } + RetryCache.clear(retryCache); super.serviceStop(); } @@ -696,4 +763,27 @@ public synchronized void setAllocateResponse(AllocateResponse response) { public Server getServer() { return this.server; } + + @VisibleForTesting + static RetryCache initRetryCache(Configuration conf) { + boolean enable = conf.getBoolean( + YarnConfiguration.RM_RETRY_CACHE_ENABLED, + YarnConfiguration.DEFAULT_RM_RETRY_CACHE_ENABLED); + LOG.info("Retry cache on ApplicationMasterService is " + (enable ? "enabled" : "disabled")); + if (enable) { + float heapPercent = conf.getFloat( + YarnConfiguration.RM_RETRY_CACHE_HEAP_PERCENT, + YarnConfiguration.DEFAULT_RM_RETRY_CACHE_HEAP_PERCENT); + long entryExpiryMillis = conf.getLong( + YarnConfiguration.RM_RETRY_CACHE_EXPIRY_MS, + YarnConfiguration.DEFAULT_RM_RETRY_CACHE_EXPIRY_MS); + LOG.info("Retry cache will use " + heapPercent + + " of total heap and retry cache entry expiry time is " + + entryExpiryMillis + " millis"); + long entryExpiryNanos = entryExpiryMillis * 1000 * 1000; + return new RetryCache("AppMasterServiceRetryCache", heapPercent, + entryExpiryNanos); + } + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 91e1905..1c75993 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -222,15 +223,24 @@ public AllocateResponse run() throws Exception { } } - public void unregisterAppAttempt() throws Exception { - waitForState(RMAppAttemptState.RUNNING); + public FinishApplicationMasterResponse unregisterAppAttempt() + throws Exception { + return unregisterAppAttempt(true); + } + + public FinishApplicationMasterResponse unregisterAppAttempt( + boolean waitForStateRunning) throws Exception { + if (waitForStateRunning) { + waitForState(RMAppAttemptState.RUNNING); + } final FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( - FinalApplicationStatus.SUCCEEDED, "", ""); - unregisterAppAttempt(req,true); + FinalApplicationStatus.SUCCEEDED, "", ""); + return unregisterAppAttempt(req, waitForStateRunning); } - public void unregisterAppAttempt(final FinishApplicationMasterRequest req, + public FinishApplicationMasterResponse unregisterAppAttempt( + final FinishApplicationMasterRequest req, boolean waitForStateRunning) throws Exception { if (waitForStateRunning) { waitForState(RMAppAttemptState.RUNNING); @@ -241,13 +251,16 @@ public void unregisterAppAttempt(final FinishApplicationMasterRequest req, context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - amRMProtocol.finishApplicationMaster(req); - return null; - } - }); + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FinishApplicationMasterResponse run() throws Exception { + return amRMProtocol.finishApplicationMaster(req); + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } } public ApplicationAttemptId getApplicationAttemptId() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3817637..fd61e70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -31,6 +31,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -561,16 +562,17 @@ protected void startWepApp() { // Disable webapp } - public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, - MockAM am) throws Exception { + public static FinishApplicationMasterResponse finishAMAndVerifyAppState( + RMApp rmApp, MockRM rm, MockNM nm, MockAM am) throws Exception { FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); - am.unregisterAppAttempt(req,true); + FinishApplicationMasterResponse res = am.unregisterAppAttempt(req, true); am.waitForState(RMAppAttemptState.FINISHING); nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + return res; } public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index b0ffc85..f0dc7b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -44,7 +45,6 @@ import java.util.List; import static java.lang.Thread.sleep; -import static org.mockito.Matchers.any; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -52,6 +52,8 @@ private final int GB = 1024; private static YarnConfiguration conf; + static { DefaultMetricsSystem.setMiniClusterMode(true); } + @BeforeClass public static void setup() { conf = new YarnConfiguration(); @@ -234,29 +236,47 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.FAILED, "", ""); - Throwable cause = null; try { am1.unregisterAppAttempt(req, false); + Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + } catch (ApplicationMasterNotRegisteredException e) { + Assert.assertNotNull(e); + Assert.assertNotNull(e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "Application Master is trying to unregister before registering for:" + )); } catch (Exception e) { - cause = e.getCause(); + Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); } - Assert.assertNotNull(cause); - Assert - .assertTrue(cause instanceof ApplicationMasterNotRegisteredException); - Assert.assertNotNull(cause.getMessage()); - Assert - .assertTrue(cause - .getMessage() - .contains( - "Application Master is trying to unregister before registering for:")); am1.registerAppAttempt(); am1.unregisterAppAttempt(req, false); + am1.waitForState(RMAppAttemptState.FINISHING); } finally { if (rm != null) { rm.stop(); } } } + + @Test(timeout = 1200000) + public void testRMIdentifierOnContainerAllocationWithRetryCache() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_RETRY_CACHE_ENABLED, true); + testRMIdentifierOnContainerAllocation(); + } + + @Test(timeout = 1200000) + public void testProgressFilterWithRetryCache() throws Exception { + conf.setBoolean(YarnConfiguration.RM_RETRY_CACHE_ENABLED, true); + testProgressFilter(); + } + + @Test(timeout = 1200000) + public void testFinishApplicationMasterBeforeRegisteringWithRetryCache() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_RETRY_CACHE_ENABLED, true); + testFinishApplicationMasterBeforeRegistering(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java new file mode 100644 index 0000000..8d566a1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java @@ -0,0 +1,154 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static java.lang.Thread.sleep; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for ensuring the RM's AppMaster retry cache works correctly for + * non-idempotent requests. + * + * Retry cache works based on tracking previously received request based on the + * ClientId and CallId received in RPC requests and storing the response. The + * response is replayed on retry when the same request is received again. + * + * The test works by manipulating the Rpc {@link Server} current RPC call. For + * testing retried requests, an Rpc callId is generated only once using + * {@link #newCall()} and reused for many method calls. For testing non-retried + * request, a new callId is generated using {@link #newCall()}. + */ +public class TestApplicationMasterServiceRetryCache { + private Log LOG = LogFactory.getLog(TestApplicationMasterServiceRetryCache.class); + private static final byte[] CLIENT_ID = ClientId.getClientId(); + private YarnConfiguration conf; + private static int callId = 100; + private final int GB = 1024; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RETRY_CACHE_ENABLED, true); + } + + /** Set the current Server RPC call */ + public static void newCall() { + Server.Call call = new Server.Call(++callId, 1, null, null, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID); + Server.getCurCall().set(call); + } + + @Test(timeout = 120000) + public void testAPIsWithRetryCache() throws Exception { + MockRM rm = null; + try { + rm = new MockRM(conf); + rm.start(); + // Register node1 + MockNM nm = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app = rm.submitApp(2048); + + // kick the scheduling + nm.nodeHeartbeat(true); + RMAppAttempt attempt1 = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId()); + newCall(); + RegisterApplicationMasterResponse res = am.registerAppAttempt(); + assertEquals("If callId is identical, an identical response should be returned.", + res, am.registerAppAttempt(false)); + + newCall(); + try { + am.registerAppAttempt(false); + fail("InvalidApplicationMasterRequestException should be thrown."); + } catch (InvalidApplicationMasterRequestException e) { + assertTrue(e instanceof InvalidApplicationMasterRequestException); + } + + newCall(); + am.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am.schedule(); // send the request + assertEquals("If callId is identical, an identical response should be returned.", + alloc1Response, am.schedule()); + + // kick the scheduler + nm.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + newCall(); + alloc1Response = am.schedule(); + } + + // assert RMIdentifer is set properly in allocated containers + Container allocatedContainer = + alloc1Response.getAllocatedContainers().get(0); + ContainerTokenIdentifier tokenId = + BuilderUtils.newContainerTokenIdentifier(allocatedContainer.getContainerToken()); + assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer()); + + + newCall(); + FinishApplicationMasterResponse finishResponse = + MockRM.finishAMAndVerifyAppState(app, rm, nm, am); + assertEquals("If callId is identical, an identical response should be returned.", + finishResponse, am.unregisterAppAttempt(false)); + newCall(); + try { + am.unregisterAppAttempt(false); + fail("InvalidApplicationMasterRequestException should be thrown."); + } catch (InvalidApplicationMasterRequestException e) { + assertTrue(e instanceof InvalidApplicationMasterRequestException); + } + + } finally { + if (rm != null){ + rm.stop(); + } + } + } + +}