diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index f4ae9be..53a93c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -80,6 +80,7 @@ */ @Public @Stable + @AtMostOnce public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -104,6 +105,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( */ @Public @Stable + @AtMostOnce public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f7d6b6b..2242d89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1249,6 +1249,16 @@ public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); + public static final String RM_APPMASTER_ENABLE_RETRY_CACHE_KEY = YARN_PREFIX + + "enable.retrycache"; + public static final boolean RM_APPMASTER_ENABLE_RETRY_CACHE_DEFAULT = false; + public static final String RM_APPMASTER_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = + YARN_PREFIX + ".retrycache" + ".expirytime.millis"; + public static final long RM_APPMASTER_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 + // minutes + public static final String RM_APPMASTER_RETRY_CACHE_HEAP_PERCENT_KEY = + YARN_PREFIX + "retrycache.heap.percent"; + public static final float RM_APPMASTER_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; public YarnConfiguration() { super(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 1f5bd05..175cedc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.client; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,7 +130,23 @@ import org.junit.Before; -public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ +/** + * Test Base for ResourceManager's Protocol on HA. + * + * Limited scope: + * For all the test cases, we only test whether the method will be re-entry + * when failover happens. Does not cover the entire logic test. + * + * Test strategy: + * Create a separate failover thread with a trigger flag, + * override all APIs that added trigger flag. + * When the APIs are called, we will set trigger flag as true to kick off + * the failover. So We can make sure the failover happens during process + * of the method. If this API is marked as @Idempotent or @AtMostOnce, + * the test cases will pass; otherwise, they will throw the exception. + * + */ +public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); @@ -759,6 +783,43 @@ public AllocateResponse allocate(AllocateRequest request) return createFakeAllocateResponse(); } + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeRegisterApplicationMasterResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeFinishApplicationMasterResponse(); + } + } + + public RegisterApplicationMasterResponse + createFakeRegisterApplicationMasterResponse() { + Resource minCapability = Resource.newInstance(2048, 2); + Resource maxCapability = Resource.newInstance(4096, 4); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, "*"); + ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes()); + return RegisterApplicationMasterResponse.newInstance(minCapability, + maxCapability, acls, key, new ArrayList(), "root", + new ArrayList()); + } + + public FinishApplicationMasterResponse + createFakeFinishApplicationMasterResponse() { + return FinishApplicationMasterResponse.newInstance(true); } public AllocateResponse createFakeAllocateResponse() { @@ -769,4 +830,5 @@ public AllocateResponse createFakeAllocateResponse() { null, new ArrayList()); } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterProtocolOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterProtocolOnHA.java new file mode 100644 index 0000000..3544433 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterProtocolOnHA.java @@ -0,0 +1,120 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import junit.framework.Assert; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestApplicationMasterProtocolOnHA extends ProtocolHATestBase { + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId; + + @Before + public void initiate() throws Exception { + startHACluster(0, false, false, true); + attemptId = this.cluster.createFakeApplicationAttemptId(); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + + AMRMTokenIdentifier id = + new AMRMTokenIdentifier(attemptId); + Token appToken = + new Token(id, this.cluster.getResourceManager() + .getRMContext().getAMRMTokenSecretManager()); + appToken.setService(new Text("appToken service")); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser() + .getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + @Test(timeout = 15000) + public void testRegisterApplicationMasterOnHA() throws YarnException, + IOException { + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); + RegisterApplicationMasterResponse response = + amClient.registerApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeRegisterApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testFinishApplicationMasterOnHA() throws YarnException, + IOException { + FinishApplicationMasterRequest request = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + FinishApplicationMasterResponse response = + amClient.finishApplicationMaster(request); + Assert.assertEquals(response, + this.cluster.createFakeFinishApplicationMasterResponse()); + } + + @Test(timeout = 15000) + public void testAllocateOnHA() throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = amClient.allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + } + + private void syncToken(Token token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java deleted file mode 100644 index 13020e8..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ /dev/null @@ -1,94 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.client; - -import java.io.IOException; -import java.util.ArrayList; - -import junit.framework.Assert; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - RMAppAttempt appAttempt; - - @Before - public void initiate() throws Exception { - startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(attemptId); - Token appToken = - new Token(id, this.cluster.getResourceManager() - .getRMContext().getAMRMTokenSecretManager()); - appToken.setService(new Text("appToken service")); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser() - .getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } - } - - @Test(timeout = 15000) - public void testAllocateOnHA() throws YarnException, IOException { - AllocateRequest request = AllocateRequest.newInstance(0, 50f, - new ArrayList(), - new ArrayList(), - ResourceBlacklistRequest.newInstance(new ArrayList(), - new ArrayList())); - AllocateResponse response = amClient.allocate(request); - Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); - } - - private void syncToken(Token token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6b2cb7f..f12123d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RetryCache; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; @@ -108,6 +109,7 @@ private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; + private RetryCache retryCache; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); @@ -118,6 +120,13 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { } @Override + protected void serviceInit(Configuration conf) throws Exception { + this.retryCache = initRetryCache(conf); + + super.serviceInit(conf); + } + + @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); @@ -226,7 +235,25 @@ private ApplicationAttemptId authorizeRequest() public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { + RetryCache.CacheEntryWithPayload cacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + // Return previous response + return (RegisterApplicationMasterResponse)cacheEntry.getPayload(); + } + RegisterApplicationMasterResponse response = null; + try { + response = processRegisterApplicationMaster(request); + } finally { + RetryCache.setState(cacheEntry, response != null, response); + } + return response; + } + + public RegisterApplicationMasterResponse processRegisterApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { ApplicationAttemptId applicationAttemptId = authorizeRequest(); ApplicationId appID = applicationAttemptId.getApplicationId(); @@ -249,80 +276,107 @@ public RegisterApplicationMasterResponse registerApplicationMaster( + applicationAttemptId.getApplicationId(); LOG.warn(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), - AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - applicationAttemptId.getApplicationId(), applicationAttemptId); + this.rmContext.getRMApps() + .get(applicationAttemptId.getApplicationId()).getUser(), + AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, + applicationAttemptId.getApplicationId(), applicationAttemptId); throw new InvalidApplicationMasterRequestException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp app = this.rmContext.getRMApps().get(appID); - + // Setting the response id to 0 to identify if the // application master is register for the respective attemptid lastResponse.setResponseId(0); lock.setAllocateResponse(lastResponse); LOG.info("AM registration " + applicationAttemptId); this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppAttemptRegistrationEvent(applicationAttemptId, request - .getHost(), request.getRpcPort(), request.getTrackingUrl())); + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptRegistrationEvent(applicationAttemptId, request + .getHost(), request.getRpcPort(), request.getTrackingUrl())); RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM, - "ApplicationMasterService", appID, applicationAttemptId); - - // Pick up min/max resource from scheduler... - RegisterApplicationMasterResponse response = recordFactory - .newRecordInstance(RegisterApplicationMasterResponse.class); - response.setMaximumResourceCapability(rScheduler - .getMaximumResourceCapability()); - response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) - .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); - response.setQueue(app.getQueue()); - if (UserGroupInformation.isSecurityEnabled()) { - LOG.info("Setting client token master key"); - response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext - .getClientToAMTokenSecretManager() - .getMasterKey(applicationAttemptId).getEncoded())); - } + "ApplicationMasterService", appID, applicationAttemptId); + return createRegisterApplicationMasterResponse(applicationAttemptId); + } + } - // For work-preserving AM restart, retrieve previous attempts' containers - // and corresponding NM tokens. - List transferredContainers = - ((AbstractYarnScheduler) rScheduler) + // assumed to be called with lock + private RegisterApplicationMasterResponse createRegisterApplicationMasterResponse( + ApplicationAttemptId applicationAttemptId) + throws UnknownHostException { + ApplicationId appID = applicationAttemptId.getApplicationId(); + RMApp app = this.rmContext.getRMApps().get(appID); + + // Pick up min/max resource from scheduler... + RegisterApplicationMasterResponse response = recordFactory + .newRecordInstance(RegisterApplicationMasterResponse.class); + response.setMaximumResourceCapability(rScheduler + .getMaximumResourceCapability()); + response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) + .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); + response.setQueue(app.getQueue()); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Setting client token master key"); + response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext + .getClientToAMTokenSecretManager() + .getMasterKey(applicationAttemptId).getEncoded())); + } + + // For work-preserving AM restart, retrieve previous attempts' containers + // and corresponding NM tokens. + List transferredContainers = + ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - if (!transferredContainers.isEmpty()) { - response.setContainersFromPreviousAttempts(transferredContainers); - List nmTokens = new ArrayList(); - for (Container container : transferredContainers) { - try { - nmTokens.add(rmContext.getNMTokenSecretManager() + if (!transferredContainers.isEmpty()) { + response.setContainersFromPreviousAttempts(transferredContainers); + List nmTokens = new ArrayList(); + for (Container container : transferredContainers) { + try { + nmTokens.add(rmContext.getNMTokenSecretManager() .createAndGetNMToken(app.getUser(), applicationAttemptId, - container)); - } catch (IllegalArgumentException e) { - // if it's a DNS issue, throw UnknowHostException directly and that - // will be automatically retried by RMProxy in RPC layer. - if (e.getCause() instanceof UnknownHostException) { - throw (UnknownHostException) e.getCause(); - } + container)); + } catch (IllegalArgumentException e) { + // if it's a DNS issue, throw UnknowHostException directly and that + // will be automatically retried by RMProxy in RPC layer. + if (e.getCause() instanceof UnknownHostException) { + throw (UnknownHostException) e.getCause(); } } - response.setNMTokensFromPreviousAttempts(nmTokens); - LOG.info("Application " + appID + " retrieved " - + transferredContainers.size() + " containers from previous" - + " attempts and " + nmTokens.size() + " NM tokens."); } - return response; + response.setNMTokensFromPreviousAttempts(nmTokens); + LOG.info("Application " + appID + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and " + nmTokens.size() + " NM tokens."); } + return response; } @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { + RetryCache.CacheEntryWithPayload cacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + // Return previous response + return (FinishApplicationMasterResponse)cacheEntry.getPayload(); + } + + FinishApplicationMasterResponse response = null; + try { + response = processFinishApplicationMaster(request); + } finally { + RetryCache.setState(cacheEntry, response != null, response); + } + return response; + } + public FinishApplicationMasterResponse processFinishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { ApplicationAttemptId applicationAttemptId = authorizeRequest(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); @@ -636,6 +690,7 @@ protected void serviceStop() throws Exception { if (this.server != null) { this.server.stop(); } + RetryCache.clear(retryCache); super.serviceStop(); } @@ -659,4 +714,33 @@ public synchronized void setAllocateResponse(AllocateResponse response) { public Server getServer() { return this.server; } + + @VisibleForTesting + static RetryCache initRetryCache(Configuration conf) { + boolean enable = conf.getBoolean( + YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_KEY, + YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_DEFAULT); + LOG.info("Retry cache on ApplicationMasterService is " + (enable ? "enabled" : "disabled")); + if (enable) { + float heapPercent = conf.getFloat( + YarnConfiguration.RM_APPMASTER_RETRY_CACHE_HEAP_PERCENT_KEY, + YarnConfiguration.RM_APPMASTER_RETRY_CACHE_HEAP_PERCENT_DEFAULT); + long entryExpiryMillis = conf.getLong( + YarnConfiguration.RM_APPMASTER_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY, + YarnConfiguration.RM_APPMASTER_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT); + LOG.info("Retry cache will use " + heapPercent + + " of total heap and retry cache entry expiry time is " + + entryExpiryMillis + " millis"); + long entryExpiryNanos = entryExpiryMillis * 1000 * 1000; + return new RetryCache("AppMasterServiceRetryCache", heapPercent, + entryExpiryNanos); + } + return null; + } + + @VisibleForTesting + RetryCache getRetryCache() { + return retryCache; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..99345a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.junit.Assert; import org.apache.hadoop.security.UserGroupInformation; @@ -220,15 +221,21 @@ public AllocateResponse run() throws Exception { } } - public void unregisterAppAttempt() throws Exception { - waitForState(RMAppAttemptState.RUNNING); + public FinishApplicationMasterResponse unregisterAppAttempt() + throws Exception { + return unregisterAppAttempt(true); + } + + public FinishApplicationMasterResponse unregisterAppAttempt( + boolean waitForStateRunning) throws Exception { final FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( - FinalApplicationStatus.SUCCEEDED, "", ""); - unregisterAppAttempt(req,true); + FinalApplicationStatus.SUCCEEDED, "", ""); + return unregisterAppAttempt(req, waitForStateRunning); } - public void unregisterAppAttempt(final FinishApplicationMasterRequest req, + public FinishApplicationMasterResponse unregisterAppAttempt( + final FinishApplicationMasterRequest req, boolean waitForStateRunning) throws Exception { if (waitForStateRunning) { waitForState(RMAppAttemptState.RUNNING); @@ -239,13 +246,16 @@ public void unregisterAppAttempt(final FinishApplicationMasterRequest req, context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - amRMProtocol.finishApplicationMaster(req); - return null; - } - }); + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FinishApplicationMasterResponse run() throws Exception { + return amRMProtocol.finishApplicationMaster(req); + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } } public ApplicationAttemptId getApplicationAttemptId() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..c43be8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -504,16 +505,17 @@ protected void startWepApp() { // Disable webapp } - public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, - MockAM am) throws Exception { + public static FinishApplicationMasterResponse finishAMAndVerifyAppState( + RMApp rmApp, MockRM rm, MockNM nm, MockAM am) throws Exception { FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); - am.unregisterAppAttempt(req,true); + FinishApplicationMasterResponse res = am.unregisterAppAttempt(req, true); am.waitForState(RMAppAttemptState.FINISHING); nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + return res; } public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aa..b938317 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,60 +18,35 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.collect.Maps; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; - -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; + import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentMap; import static java.lang.Thread.sleep; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.*; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -79,6 +54,8 @@ private final int GB = 1024; private static YarnConfiguration conf; + static { DefaultMetricsSystem.setMiniClusterMode(true); } + @BeforeClass public static void setup() { conf = new YarnConfiguration(); @@ -283,4 +260,25 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { } } } + + @Test(timeout = 3000000) + public void testRMIdentifierOnContainerAllocationWithRetryCache() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_KEY, true); + testRMIdentifierOnContainerAllocation(); + } + + @Test(timeout = 1200000) + public void testProgressFilterWithRetryCache() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_KEY, true); + testProgressFilter(); + } + + @Test(timeout=1200000) + public void testFinishApplicationMasterBeforeRegisteringWithRetryCache() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_KEY, true); + testFinishApplicationMasterBeforeRegistering(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java new file mode 100644 index 0000000..e4c6e3e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceRetryCache.java @@ -0,0 +1,137 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static java.lang.Thread.sleep; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcConstants; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestApplicationMasterServiceRetryCache { + private Log LOG = LogFactory.getLog(TestApplicationMasterServiceRetryCache.class); + private static final byte[] CLIENT_ID = ClientId.getClientId(); + private YarnConfiguration conf; + private static int callId = 100; + private final int GB = 1024; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_APPMASTER_ENABLE_RETRY_CACHE_KEY, true); + } + + /** Set the current Server RPC call */ + public static void newCall() { + Server.Call call = new Server.Call(++callId, 1, null, null, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID); + Server.getCurCall().set(call); + } + + @Test(timeout = 30000) + public void testAPIsWithRetryCache() throws Exception { + MockRM rm = null; + try { + rm = new MockRM(conf); + rm.start(); + // Register node1 + //ApplicationMasterService amService = rm.getRMContext().getApplicationMasterService(); + MockNM nm = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app = rm.submitApp(2048); + + // kick the scheduling + nm.nodeHeartbeat(true); + RMAppAttempt attempt1 = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId()); + newCall(); + RegisterApplicationMasterResponse res = am.registerAppAttempt(); + assertEquals(res, am.registerAppAttempt(false)); + assertEquals(res, am.registerAppAttempt(false)); + newCall(); + try { + am.registerAppAttempt(false); + fail("InvalidApplicationMasterRequestException should be thrown."); + } catch (InvalidApplicationMasterRequestException e) { + } + + newCall(); + am.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am.schedule(); // send the request + + // kick the scheduler + nm.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + alloc1Response = am.schedule(); + } + + // assert RMIdentifer is set properly in allocated containers + Container allocatedContainer = + alloc1Response.getAllocatedContainers().get(0); + ContainerTokenIdentifier tokenId = + BuilderUtils.newContainerTokenIdentifier(allocatedContainer.getContainerToken()); + org.junit.Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifer()); + + newCall(); + FinishApplicationMasterResponse finishResponse = + MockRM.finishAMAndVerifyAppState(app, rm, nm, am); + assertEquals(finishResponse, am.unregisterAppAttempt(false)); + newCall(); + try { + am.unregisterAppAttempt(false); + fail("InvalidApplicationMasterRequestException should be thrown."); + } catch (InvalidApplicationMasterRequestException e) { + // InvalidApplicationMasterRequestException is thrown + // after expiring RetryCache + } + + } finally { + if (rm != null){ + rm.stop(); + } + } + } + +}