diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 486bebf..74ca61b 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -309,13 +309,4 @@ - - - - - - - - - diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 06bbc35..f5cc634 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -25,12 +25,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; @@ -39,16 +42,13 @@ public class ClientRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + private static final ClientRMProxy INSTANCE = new ClientRMProxy(); private interface ClientRMProtocols extends ApplicationClientProtocol, ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { // Add nothing } - static { - INSTANCE = new ClientRMProxy(); - } - private ClientRMProxy(){ super(); } @@ -63,9 +63,20 @@ private ClientRMProxy(){ */ public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { - // This method exists only to initiate this class' static INSTANCE. TODO: - // FIX if possible - return RMProxy.createRMProxy(configuration, protocol); + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); + RetryPolicy retryPolicy = createRetryPolicy(conf); + if (HAUtil.isHAEnabled(conf)) { + RMFailoverProxyProvider provider = + INSTANCE.createRMFailoverProxyProvider(conf, protocol); + return (T) RetryProxy.create(protocol, provider, retryPolicy); + } else { + InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); + LOG.info("Connecting to ResourceManager at " + rmAddress); + T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } } private static void setupTokens(InetSocketAddress resourceManagerAddress) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 913eb04..d160c7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -50,7 +50,6 @@ public class RMProxy { private static final Log LOG = LogFactory.getLog(RMProxy.class); - protected static RMProxy INSTANCE; protected RMProxy() {} @@ -72,31 +71,6 @@ protected InetSocketAddress getRMAddress( } /** - * Create a proxy for the specified protocol. For non-HA, - * this is a direct connection to the ResourceManager address. When HA is - * enabled, the proxy handles the failover between the ResourceManagers as - * well. - */ - @Private - protected static T createRMProxy(final Configuration configuration, - final Class protocol) throws IOException { - YarnConfiguration conf = (configuration instanceof YarnConfiguration) - ? (YarnConfiguration) configuration - : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = createRetryPolicy(conf); - if (HAUtil.isHAEnabled(conf)) { - RMFailoverProxyProvider provider = - INSTANCE.createRMFailoverProxyProvider(conf, protocol); - return (T) RetryProxy.create(protocol, provider, retryPolicy); - } else { - InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); - LOG.info("Connecting to ResourceManager at " + rmAddress); - T proxy = RMProxy.getProxy(conf, protocol, rmAddress); - return (T) RetryProxy.create(protocol, proxy, retryPolicy); - } - } - - /** * @deprecated * This method is deprecated and is not used by YARN internally any more. * To create a proxy to the RM, use ClientRMProxy#createRMProxy or @@ -125,7 +99,7 @@ protected InetSocketAddress getRMAddress( * RetryProxy. */ @Private - static T getProxy(final Configuration conf, + public static T getProxy(final Configuration conf, final Class protocol, final InetSocketAddress rmAddress) throws IOException { return UserGroupInformation.getCurrentUser().doAs( @@ -140,7 +114,7 @@ public T run() { /** * Helper method to create FailoverProxyProvider. */ - private RMFailoverProxyProvider createRMFailoverProxyProvider( + protected RMFailoverProxyProvider createRMFailoverProxyProvider( Configuration conf, Class protocol) { Class> defaultProviderClass; try { @@ -160,25 +134,6 @@ public T run() { } /** - * A RetryPolicy to allow failing over upto the specified maximum time. - */ - private static class FailoverUptoMaximumTimePolicy implements RetryPolicy { - private long maxTime; - - FailoverUptoMaximumTimePolicy(long maxTime) { - this.maxTime = maxTime; - } - - @Override - public RetryAction shouldRetry(Exception e, int retries, int failovers, - boolean isIdempotentOrAtMostOnce) throws Exception { - return System.currentTimeMillis() < maxTime - ? RetryAction.FAILOVER_AND_RETRY - : RetryAction.FAIL; - } - } - - /** * Fetch retry policy from Configuration */ @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 15a26e5..e86629a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -25,17 +25,18 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.base.Preconditions; public class ServerRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); - - static { - INSTANCE = new ServerRMProxy(); - } + private static final ServerRMProxy INSTANCE = new ServerRMProxy(); private ServerRMProxy() { super(); @@ -51,9 +52,20 @@ private ServerRMProxy() { */ public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { - // This method exists only to initiate this class' static INSTANCE. TODO: - // FIX if possible - return RMProxy.createRMProxy(configuration, protocol); + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); + RetryPolicy retryPolicy = createRetryPolicy(conf); + if (HAUtil.isHAEnabled(conf)) { + RMFailoverProxyProvider provider = + INSTANCE.createRMFailoverProxyProvider(conf, protocol); + return (T) RetryProxy.create(protocol, provider, retryPolicy); + } else { + InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); + LOG.info("Connecting to ResourceManager at " + rmAddress); + T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } } @InterfaceAudience.Private