diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6677478..d667519 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3084,15 +3084,18 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
+ // 5 minutes
+ public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
+
+ public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
+ FEDERATION_PREFIX + "flush-cache-for-rm-addr";
+ public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";
- // 5 minutes
- public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
-
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
index e3f9155..12f96c9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
@@ -49,7 +50,11 @@
import org.junit.Test;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -61,12 +66,20 @@
private FederationStateStore stateStore;
private final String dummyCapability = "cap";
+ private GetClusterMetricsResponse threadResponse;
+
@Before
public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration();
- stateStore = new MemoryFederationStateStore();
+
+ // Configure Facade cache to use a very long ttl
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
+
+ stateStore = spy(new MemoryFederationStateStore());
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
+ verify(stateStore, times(0))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
}
@After
@@ -75,12 +88,25 @@ public void tearDown() throws Exception {
stateStore = null;
}
- @Test
+ @Test(timeout = 60000)
public void testFederationRMFailoverProxyProvider() throws Exception {
+ testProxyProvider(true);
+ }
+
+ @Test (timeout=60000)
+ public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
+ throws Exception {
+ testProxyProvider(false);
+ }
+
+ private void testProxyProvider(boolean facadeFlushCache) throws Exception {
final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
final MiniYARNCluster cluster = new MiniYARNCluster(
"testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
+ conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
+ facadeFlushCache);
+
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
@@ -104,10 +130,16 @@ public void testFederationRMFailoverProxyProvider() throws Exception {
.createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
UserGroupInformation.getCurrentUser());
+ verify(stateStore, times(1))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+
// client will retry until the rm becomes active.
GetClusterMetricsResponse response =
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+ verify(stateStore, times(1))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+
// validate response
checkResponse(response);
@@ -118,7 +150,50 @@ public void testFederationRMFailoverProxyProvider() throws Exception {
// Transition rm2 to active;
makeRMActive(subClusterId, cluster, 1);
- response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+ verify(stateStore, times(1))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+
+ threadResponse = null;
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // In non flush cache case, we will be hitting the cache with old RM
+ // address and keep failing before the cache is flushed
+ threadResponse =
+ client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+ } catch (YarnException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.start();
+
+ if (!facadeFlushCache) {
+ // Add a wait so that hopefully the thread has started hitting old cached
+ Thread.sleep(500);
+
+ // Should still be hitting cache
+ verify(stateStore, times(1))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+
+ // Force flush cache, so that it will pick up the new RM address
+ FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
+ true);
+ }
+
+ // Wait for the thread to finish and grab result
+ thread.join();
+ response = threadResponse;
+
+ if (facadeFlushCache) {
+ verify(stateStore, atLeast(2))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+ } else {
+ verify(stateStore, times(2))
+ .getSubClusters(any(GetSubClustersInfoRequest.class));
+ }
// validate response
checkResponse(response);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index adf8d8a..c801963 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2923,6 +2923,15 @@
+
+ Whether to flush FederationStateStoreFacade cache to get subcluster info
+ when FederationRMFailoverProxyProvider is performing failover.
+
+ yarn.federation.flush-cache-for-rm-addr
+ true
+
+
+
The registry base directory for federation.
yarn.federation.registry.base-dir
yarnfederation/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index c631208..cf6d1ef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -64,7 +64,8 @@
private FederationStateStoreFacade facade;
private SubClusterId subClusterId;
private UserGroupInformation originalUser;
- private boolean federationFailoverEnabled = false;
+ private boolean federationFailoverEnabled;
+ private boolean flushFacadeCacheForYarnRMAddr;
@Override
public void init(Configuration configuration, RMProxy proxy,
@@ -75,13 +76,16 @@ public void init(Configuration configuration, RMProxy proxy,
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId);
- this.facade = facade.getInstance();
+ this.facade = FederationStateStoreFacade.getInstance();
if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration;
}
federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+ flushFacadeCacheForYarnRMAddr =
+ conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
+ YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@@ -119,7 +123,8 @@ private T getProxyInternal(boolean isFailover) {
try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId);
- subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
+ subClusterInfo = facade.getSubCluster(subClusterId,
+ this.flushFacadeCacheForYarnRMAddr && isFailover);
// updating the conf with the refreshed RM addresses as proxy
// creations are based out of conf
updateRMAddress(subClusterInfo);