diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 73cc18558d1..d625b28a9fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -430,7 +430,8 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo( return pipeline.getRootInterceptor().getResourceTypeInfo(request); } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); if (!userPipelineMap.containsKey(user)) { @@ -504,36 +505,31 @@ protected ClientRequestInterceptor createRequestInterceptorChain() { * @param user */ private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; - synchronized (this.userPipelineMap) { - if (this.userPipelineMap.containsKey(user)) { - LOG.info("Request to start an already existing user: {}" - + " was received, so ignoring.", user); - return; - } - - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for application " - + "for the user: {}", user); - + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + ClientRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } + LOG.error("Init ClientRequestInterceptor error for user: " + user, e); throw e; } + + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + this.userPipelineMap.put(user, chainWrapper); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index b8b7ad818f3..9936643795f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -165,7 +165,8 @@ protected void serviceStop() throws Exception { return interceptorClassNames; } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); if (!userPipelineMap.containsKey(user)) { @@ -240,35 +241,30 @@ protected RMAdminRequestInterceptor createRequestInterceptorChain() { * @param user */ private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; - synchronized (this.userPipelineMap) { - if (this.userPipelineMap.containsKey(user)) { - LOG.info("Request to start an already existing user: {}" - + " was received, so ignoring.", user); - return; - } - - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + RMAdminRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } + LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e); throw e; } + + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + this.userPipelineMap.put(user, chainWrapper); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index ae57f1cdc88..6bdcb781229 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -243,35 +243,30 @@ protected RESTRequestInterceptor createRequestInterceptorChain() { * @param user */ private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; - synchronized (this.userPipelineMap) { - if (this.userPipelineMap.containsKey(user)) { - LOG.info("Request to start an already existing user: {}" - + " was received, so ignoring.", user); - return; - } - - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for the user: {}", user); - + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + RESTRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } + LOG.error("Init RESTRequestInterceptor error for user: " + user, e); throw e; } + + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + this.userPipelineMap.put(user, chainWrapper); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java index a9c37293f69..9813572ad1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; @@ -207,4 +209,59 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + @Test + public void testClientPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client, to get a + * ClientRequestInterceptor for the user. To check the interceptor + * is null or not in child thread, we simply check it in the main thread. + */ + class ClientTestThread extends Thread { + private ClientRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private ClientRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public ClientRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterClientRMService().getInterceptorChain(); + ClientRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init client interceptor success for user " + user); + return interceptor; + } + }); + } + } + + /* + * When multi thread begin to request pipeline the first time, we start a + * thread first, this thread may not finish init a chainWrapper before + * another thread start, so the second thread may init a chainWrapper too. + * But finally, the two thread will get the same chainWrapper, we should + * ensure the interceptors in the chainWrappers are equal and not null. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertEquals(client1.interceptor, client2.interceptor); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java index 11786e6f980..d7a33cda8d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; @@ -216,4 +218,59 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + @Test + public void testRMAdminPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client, to get a + * RequestInterceptorChainWrapper for the user. To check the interceptor + * is null or not in child thread, we simply check it in the main thread. + */ + class ClientTestThread extends Thread { + private RMAdminRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RMAdminRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RMAdminRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getRouterRMAdminService().getInterceptorChain(); + RMAdminRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init rm admin interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * When multi thread begin to request pipeline the first time, we start a + * thread first, this thread may not finish init a chainWrapper before + * another thread start, so the second thread may init a chainWrapper too. + * But finally, the two thread will get the same chainWrapper, we should + * ensure the interceptors in the chainWrappers are equal and not null. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertEquals(client1.interceptor, client2.interceptor); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java index c96575c21a4..91d14dc5463 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServices.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import javax.ws.rs.core.Response; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -49,12 +51,17 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test class to validate the WebService interceptor model inside the Router. */ public class TestRouterWebServices extends BaseRouterWebServicesTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterWebServices.class); + private String user = "test1"; /** @@ -266,4 +273,59 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + @Test + public void testWebPipelineConcurrent() throws InterruptedException { + final String user = "test1"; + + /* + * ClientTestThread is a thread to simulate a client, to get a + * RequestInterceptorChainWrapper for the user. To check the interceptor + * is null or not in child thread, we simply check it in the main thread. + */ + class ClientTestThread extends Thread { + private RESTRequestInterceptor interceptor; + @Override public void run() { + try { + interceptor = pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + private RESTRequestInterceptor pipeline() + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RESTRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper wrapper = + getInterceptorChain(user); + RESTRequestInterceptor interceptor = + wrapper.getRootInterceptor(); + Assert.assertNotNull(interceptor); + LOG.info("init web interceptor success for user" + user); + return interceptor; + } + }); + } + } + + /* + * When multi thread begin to request pipeline the first time, we start a + * thread first, this thread may not finish init a chainWrapper before + * another thread start, so the second thread may init a chainWrapper too. + * But finally, the two thread will get the same chainWrapper, we should + * ensure the interceptors in the chainWrappers are equal and not null. + */ + ClientTestThread client1 = new ClientTestThread(); + ClientTestThread client2 = new ClientTestThread(); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + + Assert.assertNotNull(client1.interceptor); + Assert.assertNotNull(client2.interceptor); + Assert.assertEquals(client1.interceptor, client2.interceptor); + } + }