diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 5e91a20..c17d8ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -297,11 +297,16 @@ protected void initializePipeline( + " ApplicationId:" + applicationAttemptId + " for the user: " + user); - RequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext( - applicationAttemptId, user, amrmToken, localToken)); - chainWrapper.init(interceptorChain, applicationAttemptId); + try { + RequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext(this.nmContext, + applicationAttemptId, user, amrmToken, localToken)); + chainWrapper.init(interceptorChain, applicationAttemptId); + } catch (Exception e) { + this.applPipelineMap.remove(applicationAttemptId.getApplicationId()); + throw e; + } } /** @@ -317,8 +322,10 @@ protected void stopApplication(ApplicationId applicationId) { this.applPipelineMap.remove(applicationId); if (pipeline == null) { - LOG.info("Request to stop an application that does not exist. Id:" - + applicationId); + LOG.info( + "No interceptor pipeline for application {}," + + " likely because its AM is not run in this node.", + applicationId); } else { LOG.info("Stopping the request processing pipeline for application: " + applicationId); @@ -387,11 +394,11 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, } private AMRMProxyApplicationContext createApplicationMasterContext( - ApplicationAttemptId applicationAttemptId, String user, + Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken) { AMRMProxyApplicationContextImpl appContext = - new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(), + new AMRMProxyApplicationContextImpl(context, getConfig(), applicationAttemptId, user, amrmToken, localToken); return appContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 7f96947..9eb2c0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -121,9 +121,9 @@ public void setUp() { + MockRequestInterceptor.class.getName()); this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(conf); + this.dispatcher.init(this.conf); this.dispatcher.start(); - this.amrmProxyService = createAndStartAMRMProxyService(); + createAndStartAMRMProxyService(this.conf); } @After @@ -137,12 +137,19 @@ protected ExecutorService getThreadPool() { return threadpool; } - protected MockAMRMProxyService createAndStartAMRMProxyService() { - MockAMRMProxyService svc = + protected Configuration getConf() { + return this.conf; + } + + protected void createAndStartAMRMProxyService(Configuration config) { + // Stop the existing instance first if not null + if (this.amrmProxyService != null) { + this.amrmProxyService.stop(); + } + this.amrmProxyService = new MockAMRMProxyService(new NullContext(), dispatcher); - svc.init(conf); - svc.start(); - return svc; + this.amrmProxyService.init(config); + this.amrmProxyService.start(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 7fffddf..6ac9d84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -20,17 +20,22 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -90,6 +95,32 @@ public void testRegisterOneApplicationMaster() throws Exception { } /** + * Tests the case when interceptor pipeline initialization fails. + */ + @Test + public void testInterceptorInitFailure() { + Configuration conf = this.getConf(); + // Override with a bad interceptor configuration + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + "class.that.does.not.exist"); + + // Reinitialize instance with the new config + createAndStartAMRMProxyService(conf); + int testAppId = 1; + try { + registerApplicationMaster(testAppId); + Assert.fail("Should not reach here. Expecting an exception thrown"); + } catch (Exception e) { + Map pipelines = + getAMRMProxyService().getPipelines(); + ApplicationId id = getApplicationId(testAppId); + Assert.assertTrue( + "The interceptor pipeline should be removed if initializtion fails", + pipelines.get(id) == null); + } + } + + /** * Tests the registration of multiple application master serially one at a * time. *