diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 0634cc3..c2089b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -285,6 +286,7 @@ public void testRMWebAppRedirect() throws YarnException, getAdminService(0).transitionToActive(req); String rm1Url = "http://0.0.0.0:18088"; String rm2Url = "http://0.0.0.0:28088"; + String header = getHeader("Refresh", rm2Url); assertTrue(header.contains("; url=" + rm1Url)); @@ -323,6 +325,16 @@ public void testRMWebAppRedirect() throws YarnException, // Due to the limitation of MiniYARNCluster and dispatcher is a singleton, // we couldn't add the test case after explicitFailover(); + + // transit the active RM to standby + // Both of RMs are in standby mode + getAdminService(0).transitionToStandby(req); + // RM2 is expected to send the httpRequest to itself. + // The Header Field: Refresh is expected to be set. + String redirectURL = getRefreshURL(rm2Url); + assertTrue(redirectURL != null + && redirectURL.contains(YarnWebParams.NEXT_REFRESH_INTERVAL) + && redirectURL.contains(rm2Url)); } static String getHeader(String field, String url) { @@ -337,4 +349,17 @@ static String getHeader(String field, String url) { return fieldHeader; } + static String getRefreshURL(String url) { + String redirectUrl = null; + try { + HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); + // do not automatically follow the redirection + // otherwise we get too many redirections exception + conn.setInstanceFollowRedirects(false); + redirectUrl = conn.getHeaderField("Refresh"); + } catch (Exception e) { + // throw new RuntimeException(e); + } + return redirectUrl; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/YarnWebParams.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/YarnWebParams.java index 91d2a20..9105f40 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/YarnWebParams.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/YarnWebParams.java @@ -32,4 +32,5 @@ String APP_STATE = "app.state"; String QUEUE_NAME = "queue.name"; String NODE_STATE = "node.state"; + String NEXT_REFRESH_INTERVAL = "next.fresh.interval"; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebAppFilter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebAppFilter.java index 49fd1f5..b0db04d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebAppFilter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebAppFilter.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Random; import java.util.Set; import javax.inject.Inject; @@ -29,7 +33,10 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HtmlQuoting; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.YarnWebParams; import com.google.common.collect.Sets; import com.google.inject.Injector; @@ -47,11 +54,26 @@ // define a set of URIs which do not need to do redirection private static final Set NON_REDIRECTED_URIS = Sets.newHashSet( "/conf", "/stacks", "/logLevel", "/logs"); + private String path; + private static final int BASIC_SLEEP_TIME = 5; + private static final int MAX_SLEEP_TIME = 5 * 60; @Inject - public RMWebAppFilter(Injector injector) { + public RMWebAppFilter(Injector injector, Configuration conf) { super(injector); this.injector=injector; + InetSocketAddress sock = YarnConfiguration.useHttps(conf) + ? conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT) + : conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); + + path = sock.getHostName() + ":" + Integer.toString(sock.getPort()); + path = YarnConfiguration.useHttps(conf) + ? "https://" + path + : "http://" + path; } @Override @@ -68,9 +90,11 @@ public void doFilter(HttpServletRequest request, rmWebApp.checkIfStandbyRM(); if (rmWebApp.isStandby() && shouldRedirect(rmWebApp, uri)) { - String redirectPath = rmWebApp.getRedirectPath() + uri; + + String redirectPath = rmWebApp.getRedirectPath(); if (redirectPath != null && !redirectPath.isEmpty()) { + redirectPath += uri; String redirectMsg = "This is standby RM. Redirecting to the current active RM: " + redirectPath; @@ -78,11 +102,40 @@ public void doFilter(HttpServletRequest request, PrintWriter out = response.getWriter(); out.println(redirectMsg); return; + } else { + boolean doRetry = true; + String retryIntervalStr = + request.getParameter(YarnWebParams.NEXT_REFRESH_INTERVAL); + int retryInterval = 0; + if (retryIntervalStr != null) { + try { + retryInterval = Integer.parseInt(retryIntervalStr.trim()); + } catch (NumberFormatException ex) { + doRetry = false; + } + } + int next = calculateExponentialTime(retryInterval); + + String redirectUrl = + appendOrReplaceParamter(path + uri, + YarnWebParams.NEXT_REFRESH_INTERVAL + "=" + (retryInterval + 1)); + if (redirectUrl == null || next > MAX_SLEEP_TIME) { + doRetry = false; + } + String redirectMsg = + doRetry ? "Can not find any active RM. Will retry in next " + next + + " seconds." : "There is no active RM right now."; + PrintWriter out = response.getWriter(); + out.println(redirectMsg); + if (doRetry) { + response.setHeader("Refresh", next + ";url=" + redirectUrl); + response.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } } + return; } super.doFilter(request, response, chain); - } private boolean shouldRedirect(RMWebApp rmWebApp, String uri) { @@ -90,4 +143,33 @@ private boolean shouldRedirect(RMWebApp rmWebApp, String uri) { && !uri.equals("/" + rmWebApp.name() + "/cluster") && !NON_REDIRECTED_URIS.contains(uri); } + + private String appendOrReplaceParamter(String uri, String newQuery) { + if (uri.contains(YarnWebParams.NEXT_REFRESH_INTERVAL + "=")) { + return uri.replaceAll(YarnWebParams.NEXT_REFRESH_INTERVAL + "=[^&]+", + newQuery); + } + try { + URI oldUri = new URI(uri); + String appendQuery = oldUri.getQuery(); + if (appendQuery == null) { + appendQuery = newQuery; + } else { + appendQuery += "&" + newQuery; + } + + URI newUri = + new URI(oldUri.getScheme(), oldUri.getAuthority(), oldUri.getPath(), + appendQuery, oldUri.getFragment()); + + return newUri.toString(); + } catch (URISyntaxException e) { + return null; + } + } + + private static int calculateExponentialTime(int retries) { + long baseTime = BASIC_SLEEP_TIME * (1L << retries); + return (int) (baseTime * ((new Random()).nextDouble() + 0.5)); + } }