diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index cfabc5c9687..ca36b439351 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -591,8 +591,7 @@ public AppInfo getApp(HttpServletRequest hsr, String appId, } /** - * The YARN Router will forward to the respective YARN RM in which the AM is - * running. + * The YARN Router will forward to all sub clusters. *

* Possible failures and behaviors: *

@@ -625,11 +624,11 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr, } SubClusterInfo subClusterInfo = null; - SubClusterId subClusterId = null; + SubClusterId homeSubClusterId = null; try { - subClusterId = + homeSubClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); - subClusterInfo = federationFacade.getSubCluster(subClusterId); + subClusterInfo = federationFacade.getSubCluster(homeSubClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); return Response @@ -638,13 +637,48 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr, .build(); } - Response response = getOrCreateInterceptorForSubCluster(subClusterId, + Response response = getOrCreateInterceptorForSubCluster(homeSubClusterId, subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState, hsr, appId); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); + Map subClustersActive = null; + try { + subClustersActive = federationFacade.getSubClusters(true); + } catch (YarnException e) { + routerMetrics.incrMultipleAppsFailedRetrieved(); + return null; + } + + // HttpServletRequest does not work with ExecutorCompletionService. + // Create a duplicate hsr. + final HttpServletRequest hsrCopy = clone(hsr); + + // Broadcast UpdateAppState to all secondary sub clusters. + // Don't wait for completion. + for (SubClusterInfo info : subClustersActive.values()) { + if (info.getSubClusterId() != homeSubClusterId) { + this.threadpool.submit(new Callable() { + @Override + public Void call() { + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + try { + interceptor.updateAppState(targetState, hsrCopy, appId); + return null; + } catch (Exception e) { + LOG.error("UpdateAppState request to application {} in SubCluster {} throws exception {}", + appId, info.getSubClusterId(), e); + return null; + } + } + }); + } + } + return response; }