diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 486bebf..74ca61b 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -309,13 +309,4 @@
-
-
-
-
-
-
-
-
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
index 06bbc35..f5cc634 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
@@ -25,12 +25,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@@ -39,16 +42,13 @@
public class ClientRMProxy extends RMProxy {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+ private static final ClientRMProxy INSTANCE = new ClientRMProxy();
private interface ClientRMProtocols extends ApplicationClientProtocol,
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
// Add nothing
}
- static {
- INSTANCE = new ClientRMProxy();
- }
-
private ClientRMProxy(){
super();
}
@@ -63,9 +63,20 @@ private ClientRMProxy(){
*/
public static T createRMProxy(final Configuration configuration,
final Class protocol) throws IOException {
- // This method exists only to initiate this class' static INSTANCE. TODO:
- // FIX if possible
- return RMProxy.createRMProxy(configuration, protocol);
+ YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+ ? (YarnConfiguration) configuration
+ : new YarnConfiguration(configuration);
+ RetryPolicy retryPolicy = createRetryPolicy(conf);
+ if (HAUtil.isHAEnabled(conf)) {
+ RMFailoverProxyProvider provider =
+ INSTANCE.createRMFailoverProxyProvider(conf, protocol);
+ return (T) RetryProxy.create(protocol, provider, retryPolicy);
+ } else {
+ InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ T proxy = RMProxy.getProxy(conf, protocol, rmAddress);
+ return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+ }
}
private static void setupTokens(InetSocketAddress resourceManagerAddress)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 913eb04..d160c7a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -50,7 +50,6 @@
public class RMProxy {
private static final Log LOG = LogFactory.getLog(RMProxy.class);
- protected static RMProxy INSTANCE;
protected RMProxy() {}
@@ -72,31 +71,6 @@ protected InetSocketAddress getRMAddress(
}
/**
- * Create a proxy for the specified protocol. For non-HA,
- * this is a direct connection to the ResourceManager address. When HA is
- * enabled, the proxy handles the failover between the ResourceManagers as
- * well.
- */
- @Private
- protected static T createRMProxy(final Configuration configuration,
- final Class protocol) throws IOException {
- YarnConfiguration conf = (configuration instanceof YarnConfiguration)
- ? (YarnConfiguration) configuration
- : new YarnConfiguration(configuration);
- RetryPolicy retryPolicy = createRetryPolicy(conf);
- if (HAUtil.isHAEnabled(conf)) {
- RMFailoverProxyProvider provider =
- INSTANCE.createRMFailoverProxyProvider(conf, protocol);
- return (T) RetryProxy.create(protocol, provider, retryPolicy);
- } else {
- InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- T proxy = RMProxy.getProxy(conf, protocol, rmAddress);
- return (T) RetryProxy.create(protocol, proxy, retryPolicy);
- }
- }
-
- /**
* @deprecated
* This method is deprecated and is not used by YARN internally any more.
* To create a proxy to the RM, use ClientRMProxy#createRMProxy or
@@ -125,7 +99,7 @@ protected InetSocketAddress getRMAddress(
* RetryProxy.
*/
@Private
- static T getProxy(final Configuration conf,
+ public static T getProxy(final Configuration conf,
final Class protocol, final InetSocketAddress rmAddress)
throws IOException {
return UserGroupInformation.getCurrentUser().doAs(
@@ -140,7 +114,7 @@ public T run() {
/**
* Helper method to create FailoverProxyProvider.
*/
- private RMFailoverProxyProvider createRMFailoverProxyProvider(
+ protected RMFailoverProxyProvider createRMFailoverProxyProvider(
Configuration conf, Class protocol) {
Class extends RMFailoverProxyProvider> defaultProviderClass;
try {
@@ -160,25 +134,6 @@ public T run() {
}
/**
- * A RetryPolicy to allow failing over upto the specified maximum time.
- */
- private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
- private long maxTime;
-
- FailoverUptoMaximumTimePolicy(long maxTime) {
- this.maxTime = maxTime;
- }
-
- @Override
- public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isIdempotentOrAtMostOnce) throws Exception {
- return System.currentTimeMillis() < maxTime
- ? RetryAction.FAILOVER_AND_RETRY
- : RetryAction.FAIL;
- }
- }
-
- /**
* Fetch retry policy from Configuration
*/
@Private
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index 15a26e5..e86629a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -25,17 +25,18 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Preconditions;
public class ServerRMProxy extends RMProxy {
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
-
- static {
- INSTANCE = new ServerRMProxy();
- }
+ private static final ServerRMProxy INSTANCE = new ServerRMProxy();
private ServerRMProxy() {
super();
@@ -51,9 +52,20 @@ private ServerRMProxy() {
*/
public static T createRMProxy(final Configuration configuration,
final Class protocol) throws IOException {
- // This method exists only to initiate this class' static INSTANCE. TODO:
- // FIX if possible
- return RMProxy.createRMProxy(configuration, protocol);
+ YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+ ? (YarnConfiguration) configuration
+ : new YarnConfiguration(configuration);
+ RetryPolicy retryPolicy = createRetryPolicy(conf);
+ if (HAUtil.isHAEnabled(conf)) {
+ RMFailoverProxyProvider provider =
+ INSTANCE.createRMFailoverProxyProvider(conf, protocol);
+ return (T) RetryProxy.create(protocol, provider, retryPolicy);
+ } else {
+ InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ T proxy = RMProxy.getProxy(conf, protocol, rmAddress);
+ return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+ }
}
@InterfaceAudience.Private