diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4cf7dcf..c96b84c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -668,6 +668,10 @@ public static boolean isAclEnabled(Configuration conf) { /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; + public static final String FS_RM_STATE_STORE_RETRY_POLICY_ENABLED = RM_PREFIX + + "fs.state-store.retry-policy.enabled"; + public static final boolean DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_ENABLED = + true; public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX + "fs.state-store.retry-policy-spec"; public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = @@ -1936,6 +1940,12 @@ public static boolean isAclEnabled(Configuration conf) { = 24 * 60 * 60; public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy.enabled"; + public static final boolean + DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED = + true; + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec"; public static final String @@ -2503,6 +2513,10 @@ public static boolean isAclEnabled(Configuration conf) { /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; + public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED = + NODE_LABELS_PREFIX + "fs-store.retry-policy.enabled"; + public static final boolean DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED + = true; public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index c431361..a8b6359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -105,7 +105,11 @@ public FileSystemTimelineWriter(Configuration conf, super(authUgi, client, resURI); Configuration fsConf = new Configuration(conf); - fsConf.setBoolean("dfs.client.retry.policy.enabled", true); + fsConf.setBoolean("dfs.client.retry.policy.enabled", fsConf.getBoolean( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED, + YarnConfiguration + .DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_ENABLED)); String retryPolicy = fsConf.get(YarnConfiguration. TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java index 69b2316..79b107e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java @@ -94,7 +94,9 @@ public void close() throws IOException { void setFileSystem(Configuration conf) throws IOException { Configuration confCopy = new Configuration(conf); - confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + confCopy.setBoolean("dfs.client.retry.policy.enabled", conf.getBoolean( + YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED, + YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_ENABLED)); String retryPolicy = confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC, YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3b869e6..a7b20ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -525,6 +525,15 @@ + Enable hdfs client retry policy specification. When enabled, + hdfs client attempt to retry based on the policy spedified by + yarn.resourcemanager.fs.state-store.retry-policy-spec. + + yarn.resourcemanager.fs.state-store.retry-policy.enabled + true + + + hdfs client retry policy specification. hdfs client retry is always enabled. Specified in pairs of sleep-time and number-of-retries and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on @@ -2416,6 +2425,15 @@ + Enable hdfs client retry policy specification. When enabled, + hdfs client attempt to retry based on the policy spedified by + yarn.node-labels.fs-store.retry-policy-spec. + + yarn.node-labels.fs-store.retry-policy.enabled + true + + + Retry policy used for FileSystem node label store. The policy is specified by N pairs of sleep-time in milliseconds and number-of-retries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 9ea5e8c..a2ebc88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -142,7 +142,9 @@ protected synchronized void startInternal() throws Exception { // authenticated with kerberos so we are good to create a file-system // handle. fsConf = new Configuration(getConfig()); - fsConf.setBoolean("dfs.client.retry.policy.enabled", true); + fsConf.setBoolean("dfs.client.retry.policy.enabled", fsConf.getBoolean( + YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_ENABLED, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_ENABLED)); String retryPolicy = fsConf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 61088e1..15869c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -101,9 +103,11 @@ public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable Path workingDirPath = new Path("/yarn/Test"); this.adminCheckEnable = adminCheckEnable; this.cluster = cluster; - FileSystem fs = cluster.getFileSystem(); + FileSystem fs = cluster.getFileSystem(0); fs.mkdirs(workingDirPath); - Path clusterURI = new Path(cluster.getURI()); + Path clusterURI = (cluster.getNumNameNodes() > 1) ? + new Path(HATestUtil.getLogicalUri(cluster)) : + new Path(cluster.getURI()); workingDirPathURI = new Path(clusterURI, workingDirPath); fs.close(); } @@ -122,6 +126,10 @@ public RMStateStore getRMStateStore() throws Exception { conf.setBoolean( YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true); } + if (cluster.getNumNameNodes() > 1) { + HATestUtil.setFailoverConfigurations(cluster, conf); + conf.setBoolean(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_ENABLED, false); + } this.store = new TestFileSystemRMStore(conf); Assert.assertEquals(store.getNumRetries(), 8); Assert.assertEquals(store.getRetryInterval(), 900L); @@ -425,4 +433,46 @@ public void run() { cluster.shutdown(); } } + + @Test(timeout = 30000) + public void testFSRMStateStoreNNFailover() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(1) + .build(); + cluster.transitionToActive(0); + cluster.waitActive(0); + try { + TestFSRMStateStoreTester fsTester = + new TestFSRMStateStoreTester(cluster, false); + final RMStateStore store = fsTester.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + final ApplicationId appid = ApplicationId.newInstance(100L, 1); + store.storeApplicationStateInternal(appid, + ApplicationStateData.newInstance(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333, null)); + cluster.shutdownNameNode(0); + Thread clientThread = new Thread() { + @Override + public void run() { + try { + // expected to keep retrying to connect to the stopped nn + // then time out if retry policy of dfs clinet is enabled. + store.removeApplication(appid); + } catch (Exception e) { + LOG.warn(e); + Assert.fail("failed to remove application."); + } + } + }; + clientThread.start(); + Thread.sleep(1000); + cluster.transitionToActive(1); + clientThread.join(); + } finally { + cluster.shutdown(); + } + } }