diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index b254665..80598a4 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -305,4 +305,9 @@ + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 72ad08f..ed7589a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -315,22 +315,30 @@ public static final String RM_STORE = RM_PREFIX + "store.class"; /** URI for FileSystemRMStateStore */ - public static final String FS_RM_STATE_STORE_URI = - RM_PREFIX + "fs.state-store.uri"; + public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + + "fs.state-store.uri"; + public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX + + "fs.state-store.retry-policy-spec"; + public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = + "2000, 500"; /** * Comma separated host:port pairs, each corresponding to a ZK server for * ZKRMStateStore */ public static final String ZK_STATE_STORE_PREFIX = - RM_PREFIX + "zk.state-store."; + RM_PREFIX + "zk-state-store."; public static final String ZK_RM_STATE_STORE_NUM_RETRIES = ZK_STATE_STORE_PREFIX + "num-retries"; - public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3; + public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500; + /** retry interval when connecting to zookeeper*/ + public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = + ZK_STATE_STORE_PREFIX + "retry-interval-ms"; + public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000; public static final String ZK_RM_STATE_STORE_ADDRESS = ZK_STATE_STORE_PREFIX + "address"; /** Timeout in millisec for ZK server connection for ZKRMStateStore */ public static final String ZK_RM_STATE_STORE_TIMEOUT_MS = - ZK_STATE_STORE_PREFIX + "timeout.ms"; + ZK_STATE_STORE_PREFIX + "timeout-ms"; public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000; /** Parent znode path under which ZKRMStateStore will create znodes */ public static final String ZK_RM_STATE_STORE_PARENT_PATH = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7f6e050..d798f4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -283,8 +283,8 @@ is implicitly fenced, meaning a single ResourceManager is able to use the store at any point in time. More details on this, along with setting up appropriate ACLs is discussed under the description for - yarn.resourcemanager.zk.state-store.root-node.acl. - yarn.resourcemanager.zk.state-store.address + yarn.resourcemanager.zk-state-store.root-node.acl. + yarn.resourcemanager.zk-state-store.address @@ -293,8 +293,15 @@ ZooKeeper. This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.num-retries - 3 + yarn.resourcemanager.zk-state-store.num-retries + 500 + + + + Retry interval in milliseconds when ZKRMStateStore tries to + connect to ZooKeeper. + yarn.resourcemanager.zk-state-store.retry-interval-ms + 2000 @@ -302,16 +309,20 @@ stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.parent-path + yarn.resourcemanager.zk-state-store.parent-path /rmstore - Timeout when connecting to ZooKeeper. + ZooKeeper session timeout in milliseconds. Session expiration + is managed by the ZooKeeper cluster itself, not by the client. This value is + used by the cluster to determine when the client's session expires. + Expirations happens when the cluster does not hear from the client within + the specified session timeout period (i.e. no heartbeat). This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.timeout.ms + yarn.resourcemanager.zk-state-store.timeout-ms 60000 @@ -320,7 +331,7 @@ This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.acl + yarn.resourcemanager.zk-state-store.acl world:anyone:rwcda @@ -336,7 +347,7 @@ permissions. By default, when this property is not set, we use the ACLs from - yarn.resourcemanager.zk.state-store.acl for shared admin access and + yarn.resourcemanager.zk-state-store.acl for shared admin access and rm-address:cluster-timestamp for username-based exclusive create-delete access. @@ -346,7 +357,7 @@ ResourceManagers have shared admin access and the Active ResourceManger takes over (exclusively) the create-delete access. - yarn.resourcemanager.zk.state-store.root-node.acl + yarn.resourcemanager.zk-state-store.root-node.acl @@ -360,6 +371,16 @@ + hdfs client retry policy specification. hdfs client retry + is always enabled. Specified in pairs of sleep-time and number-of-retries + and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on + average, the following n1 retries sleep t1 milliseconds on average, and so on. + + yarn.resourcemanager.fs.state-store.retry-policy-spec + 2000, 500 + + + Enable RM high-availability. When enabled, (1) The RM starts in the Standby mode by default, and transitions to the Active mode when prompted to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 2ef6bcd..23cefd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -94,7 +94,14 @@ protected synchronized void startInternal() throws Exception { // create filesystem only now, as part of service-start. By this time, RM is // authenticated with kerberos so we are good to create a file-system // handle. - fs = fsWorkingPath.getFileSystem(getConfig()); + Configuration conf = new Configuration(getConfig()); + conf.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC); + conf.set("dfs.client.retry.policy.spec", retryPolicy); + + fs = fsWorkingPath.getFileSystem(conf); fs.mkdirs(rmDTSecretManagerRoot); fs.mkdirs(rmAppRoot); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1621d83..f419ff0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -82,6 +82,7 @@ private String zkHostPort = null; private int zkSessionTimeout; + private long zkRetryInterval; private List zkAcl; private String zkRootNodePath; private String rmDTSecretManagerRoot; @@ -161,6 +162,9 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkSessionTimeout = conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS); + zkRetryInterval = + conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS); // Parse authentication from configuration. String zkAclConf = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL, @@ -810,6 +814,9 @@ T runWithRetries() throws Exception { } } catch (KeeperException ke) { if (shouldRetry(ke.code()) && ++retry < numRetries) { + LOG.info("Waiting for zookeeper to be connected, retry no. + " + + retry); + Thread.sleep(zkRetryInterval); continue; } throw ke; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 63fe975..4df1c3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -33,7 +36,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -81,6 +86,8 @@ public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + "100,6000"); this.store = new TestFileSystemRMStore(conf); return store; } @@ -139,4 +146,46 @@ public void testFSRMStateStore() throws Exception { cluster.shutdown(); } } + + @Test (timeout = 30000) + public void testFSRMStateStoreClientRetry() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + try { + TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + final RMStateStore store = fsTester.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + cluster.shutdownNameNodes(); + + Thread clientThread = new Thread() { + @Override + public void run() { + try { + store.storeApplicationStateInternal("application1", + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); + } catch (Exception e) { + // TODO 0 datanode exception will not be retried by dfs client, fix + // that separately. + if (!e.getMessage().contains("could only be replicated" + + " to 0 nodes instead of minReplication (=1)")) { + assertionFailedInThread.set(true); + } + e.printStackTrace(); + } + } + }; + Thread.sleep(2000); + clientThread.start(); + cluster.restartNameNode(); + clientThread.join(); + Assert.assertFalse(assertionFailedInThread.get()); + } finally { + cluster.shutdown(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 82e550c..3def83c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -114,6 +115,37 @@ public RMStateStore getRMStateStore(Configuration conf) throws Exception { } } + @Test (timeout = 20000) + public void testZKClientRetry() throws Exception { + TestZKClient zkClientTester = new TestZKClient(); + final String path = "/test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100); + final ZKRMStateStore store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + + stopServer(); + Thread clientThread = new Thread() { + @Override + public void run() { + try { + store.getDataWithRetries(path, true); + } catch (Exception e) { + e.printStackTrace(); + assertionFailedInThread.set(true); + } + } + }; + Thread.sleep(2000); + startServer(); + clientThread.join(); + Assert.assertFalse(assertionFailedInThread.get()); + } + @Test(timeout = 20000) public void testZKClientDisconnectAndReconnect() throws Exception {