diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 73cc18558d1..f04cdc332cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -430,7 +430,8 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo( return pipeline.getRootInterceptor().getResourceTypeInfo(request); } - private RequestInterceptorChainWrapper getInterceptorChain() + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { String user = UserGroupInformation.getCurrentUser().getUserName(); if (!userPipelineMap.containsKey(user)) { @@ -504,36 +505,33 @@ protected ClientRequestInterceptor createRequestInterceptorChain() { * @param user */ private void initializePipeline(String user) { - RequestInterceptorChainWrapper chainWrapper = null; - synchronized (this.userPipelineMap) { - if (this.userPipelineMap.containsKey(user)) { - LOG.info("Request to start an already existing user: {}" - + " was received, so ignoring.", user); - return; - } - - chainWrapper = new RequestInterceptorChainWrapper(); - this.userPipelineMap.put(user, chainWrapper); - } - - // We register the pipeline instance in the map first and then initialize it - // later because chain initialization can be expensive and we would like to - // release the lock as soon as possible to prevent other applications from - // blocking when one application's chain is initializing - LOG.info("Initializing request processing pipeline for application " - + "for the user: {}", user); - + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); try { + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application " + + "for the user: {}", user); + ClientRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - synchronized (this.userPipelineMap) { - this.userPipelineMap.remove(user); - } + LOG.error("Init ClientRequestInterceptor error for user: " + user, e); throw e; } + + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + this.userPipelineMap.put(user, chainWrapper); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java index a9c37293f69..7f1d8939e7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -19,8 +19,11 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; @@ -207,4 +210,45 @@ public void testUsersChainMapWithLRUCache() Assert.assertNull("test2 should have been evicted", chain); } + @Test + public void testPipelineConcurrent() throws InterruptedException { + final String user = "yarn"; + AtomicBoolean nullValue = new AtomicBoolean(false); + + class ClientThread extends Thread { + @Override public void run() { + try { + pipeline(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + private void pipeline() throws IOException, InterruptedException { + UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override public ClientRequestInterceptor run() throws Exception { + RequestInterceptorChainWrapper pipeline = + getRouterClientRMService().getInterceptorChain(); + ClientRequestInterceptor root = pipeline.getRootInterceptor(); + if(root == null) + nullValue.set(true); + Assert.assertNotNull(root); + System.out.println("init interceptor success for user " + user); + return root; + } + }); + } + } + + ClientThread client1 = new ClientThread(); + ClientThread client2 = new ClientThread(); + client1.start(); + Thread.sleep(1); + client2.start(); + client1.join(); + client2.join(); + Assert.assertFalse(nullValue.get()); + } + }