diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4cf7dcf..05e56fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -668,6 +668,10 @@ public static boolean isAclEnabled(Configuration conf) {
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI = RM_PREFIX
+ "fs.state-store.uri";
+ public static final String FS_RM_STATE_STORE_RETRY_POLICY_ENABLED = RM_PREFIX
+ + "fs.state-store.retry-policy.enabled";
+ public static final boolean DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_ENABLED =
+ false;
public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX
+ "fs.state-store.retry-policy-spec";
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
@@ -1936,6 +1940,12 @@ public static boolean isAclEnabled(Configuration conf) {
= 24 * 60 * 60;
public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy.enabled";
+ public static final boolean
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED =
+ false;
+ public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
public static final String
@@ -2503,6 +2513,10 @@ public static boolean isAclEnabled(Configuration conf) {
/** URI for NodeLabelManager */
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
+ "fs-store.root-dir";
+ public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED =
+ NODE_LABELS_PREFIX + "fs-store.retry-policy.enabled";
+ public static final boolean DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED
+ = false;
public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
NODE_LABELS_PREFIX + "fs-store.retry-policy-spec";
public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index c431361..a8b6359 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -105,7 +105,11 @@ public FileSystemTimelineWriter(Configuration conf,
super(authUgi, client, resURI);
Configuration fsConf = new Configuration(conf);
- fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
+ fsConf.setBoolean("dfs.client.retry.policy.enabled", fsConf.getBoolean(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED,
+ YarnConfiguration
+ .DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED));
String retryPolicy =
fsConf.get(YarnConfiguration.
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
index 69b2316..79b107e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
@@ -94,7 +94,9 @@ public void close() throws IOException {
void setFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
- confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
+ confCopy.setBoolean("dfs.client.retry.policy.enabled", conf.getBoolean(
+ YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED,
+ YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED));
String retryPolicy =
confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3b869e6..f78c86f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -525,6 +525,15 @@
+ Enable hdfs client retry policy specification. When enabled,
+ hdfs client attempt to retry based on the policy spedified by
+ yarn.resourcemanager.fs.state-store.retry-policy-spec.
+
+ yarn.resourcemanager.fs.state-store.retry-policy.enabled
+ false
+
+
+
hdfs client retry policy specification. hdfs client retry
is always enabled. Specified in pairs of sleep-time and number-of-retries
and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on
@@ -2416,6 +2425,15 @@
+ Enable hdfs client retry policy specification. When enabled,
+ hdfs client attempt to retry based on the policy spedified by
+ yarn.node-labels.fs-store.retry-policy-spec.
+
+ yarn.node-labels.fs-store.retry-policy.enabled
+ false
+
+
+
Retry policy used for FileSystem node label store. The policy is
specified by N pairs of sleep-time in milliseconds and number-of-retries
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 9ea5e8c..a2ebc88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -142,7 +142,9 @@ protected synchronized void startInternal() throws Exception {
// authenticated with kerberos so we are good to create a file-system
// handle.
fsConf = new Configuration(getConfig());
- fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
+ fsConf.setBoolean("dfs.client.retry.policy.enabled", fsConf.getBoolean(
+ YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_ENABLED,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_ENABLED));
String retryPolicy =
fsConf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index 61088e1..1b7a22e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -37,6 +37,8 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -101,9 +103,11 @@ public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable
Path workingDirPath = new Path("/yarn/Test");
this.adminCheckEnable = adminCheckEnable;
this.cluster = cluster;
- FileSystem fs = cluster.getFileSystem();
+ FileSystem fs = cluster.getFileSystem(0);
fs.mkdirs(workingDirPath);
- Path clusterURI = new Path(cluster.getURI());
+ Path clusterURI = (cluster.getNumNameNodes() > 1) ?
+ new Path(HATestUtil.getLogicalUri(cluster)) :
+ new Path(cluster.getURI());
workingDirPathURI = new Path(clusterURI, workingDirPath);
fs.close();
}
@@ -122,6 +126,9 @@ public RMStateStore getRMStateStore() throws Exception {
conf.setBoolean(
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
}
+ if (cluster.getNumNameNodes() > 1) {
+ HATestUtil.setFailoverConfigurations(cluster, conf);
+ }
this.store = new TestFileSystemRMStore(conf);
Assert.assertEquals(store.getNumRetries(), 8);
Assert.assertEquals(store.getRetryInterval(), 900L);
@@ -425,4 +432,46 @@ public void run() {
cluster.shutdown();
}
}
+
+ @Test(timeout = 30000)
+ public void testFSRMStateStoreNNFailover() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(1)
+ .build();
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+ try {
+ TestFSRMStateStoreTester fsTester =
+ new TestFSRMStateStoreTester(cluster, false);
+ final RMStateStore store = fsTester.getRMStateStore();
+ store.setRMDispatcher(new TestDispatcher());
+ final ApplicationId appid = ApplicationId.newInstance(100L, 1);
+ store.storeApplicationStateInternal(appid,
+ ApplicationStateData.newInstance(111, 111, "user", null,
+ RMAppState.ACCEPTED, "diagnostics", 333, null));
+ cluster.shutdownNameNode(0);
+ Thread clientThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ // expected to keep retrying to connect to the stopped nn
+ // then time out if retry policy of dfs clinet is enabled.
+ store.removeApplication(appid);
+ } catch (Exception e) {
+ LOG.warn(e);
+ Assert.fail("failed to remove application.");
+ }
+ }
+ };
+ clientThread.start();
+ Thread.sleep(1000);
+ cluster.transitionToActive(1);
+ clientThread.join();
+ } finally {
+ cluster.shutdown();
+ }
+ }
}