diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 76d280a..4960f95 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -175,6 +175,14 @@
hadoop-yarn-server-web-proxy
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-test
+
+
org.apache.zookeeper
zookeeper
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 15ac971..bf82972 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -29,15 +29,19 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,14 +68,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
@@ -152,20 +150,14 @@
protected String znodeWorkingPath;
@VisibleForTesting
- protected ZooKeeper zkClient;
-
- /* activeZkClient is not used to do actual operations,
- * it is only used to verify client session for watched events and
- * it gets activated into zkClient on connection event.
- */
- @VisibleForTesting
- ZooKeeper activeZkClient;
+ protected CuratorFramework curatorFramework;
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private String fencingNodePath;
private Op createFencingNodePathOp;
private Op deleteFencingNodePathOp;
+
private Thread verifyActiveStatusThread;
private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
@@ -192,7 +184,7 @@
@Unstable
protected List constructZkRootNodeACL(
Configuration conf, List sourceACLs) throws NoSuchAlgorithmException {
- List zkRootNodeAcl = new ArrayList();
+ List zkRootNodeAcl = new ArrayList<>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
@@ -298,22 +290,8 @@ public synchronized void startInternal() throws Exception {
}
protected void createRootDir(final String rootPath) throws Exception {
- // For root dirs, we shouldn't use the doMulti helper methods
- new ZKAction() {
- @Override
- public String run() throws KeeperException, InterruptedException {
- try {
- return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.debug(rootPath + "znode already exists!");
- return null;
- } else {
- throw ke;
- }
- }
- }
- }.runWithRetries();
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl).forPath(rootPath);
}
private void logRootNodeAcls(String prefix) throws Exception {
@@ -334,43 +312,25 @@ private synchronized void fence() throws Exception {
logRootNodeAcls("Before fencing\n");
}
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
- return null;
- }
- }.runWithRetries();
-
- // delete fencingnodepath
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- try {
- zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
- }
- return null;
- }
- }.runWithRetries();
+ curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
+ curatorFramework.delete().forPath(fencingNodePath);
if (LOG.isTraceEnabled()) {
logRootNodeAcls("After fencing\n");
}
}
- private synchronized void closeZkClients() throws IOException {
- zkClient = null;
- if (activeZkClient != null) {
- try {
- activeZkClient.close();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while closing ZK", e);
- }
- activeZkClient = null;
- }
- }
+// private synchronized void closeZkClients() throws IOException {
+// zkClient = null;
+// if (activeZkClient != null) {
+// try {
+// activeZkClient.close();
+// } catch (InterruptedException e) {
+// throw new IOException("Interrupted while closing ZK", e);
+// }
+// activeZkClient = null;
+// }
+// }
@Override
protected synchronized void closeInternal() throws Exception {
@@ -378,7 +338,8 @@ protected synchronized void closeInternal() throws Exception {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
- closeZkClients();
+ curatorFramework.close();
+// closeZkClients();
}
@Override
@@ -404,9 +365,7 @@ protected synchronized Version loadVersion() throws Exception {
if (existsWithRetries(versionNodePath, false) != null) {
byte[] data = getDataWithRetries(versionNodePath, false);
- Version version =
- new VersionPBImpl(VersionProto.parseFrom(data));
- return version;
+ return new VersionPBImpl(VersionProto.parseFrom(data));
}
return null;
}
@@ -607,7 +566,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
}
@@ -682,7 +641,7 @@ public synchronized void removeApplicationStateInternal(
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
- ArrayList opList = new ArrayList();
+ ArrayList opList = new ArrayList<>();
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
@@ -701,7 +660,7 @@ public synchronized void removeApplicationStateInternal(
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList opList = new ArrayList();
+ ArrayList opList = new ArrayList<>();
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
doStoreMultiWithRetries(opList);
}
@@ -717,7 +676,7 @@ protected synchronized void removeRMDelegationTokenState(
+ rmDTIdentifier.getSequenceNumber());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
- ArrayList opList = new ArrayList();
+ ArrayList opList = new ArrayList<>();
opList.add(Op.delete(nodeRemovePath, -1));
doDeleteMultiWithRetries(opList);
} else {
@@ -729,7 +688,7 @@ protected synchronized void removeRMDelegationTokenState(
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList opList = new ArrayList();
+ ArrayList opList = new ArrayList<>();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
@@ -764,17 +723,18 @@ private void addStoreOrUpdateOps(ArrayList opList,
if (isUpdate) {
opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
} else {
- opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
- CreateMode.PERSISTENT));
+ opList
+ .add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT));
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
- if (LOG.isDebugEnabled()) {
- LOG.debug((isUpdate ? "Storing " : "Updating ") +
- dtSequenceNumberPath + ". SequenceNumber: "
- + rmDTIdentifier.getSequenceNumber());
- }
- opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug((isUpdate ? "Storing " : "Updating ") +
+ dtSequenceNumberPath + ". SequenceNumber: "
+ + rmDTIdentifier.getSequenceNumber());
+ }
+ opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
} finally {
seqOs.close();
}
@@ -832,76 +792,6 @@ public synchronized void removeApplication(ApplicationId removeAppId)
}
}
- // ZK related code
- /**
- * Watcher implementation which forward events to the ZKRMStateStore This
- * hides the ZK methods of the store from its public interface
- */
- private final class ForwardingWatcher implements Watcher {
- private ZooKeeper watchedZkClient;
-
- public ForwardingWatcher(ZooKeeper client) {
- this.watchedZkClient = client;
- }
-
- @Override
- public void process(WatchedEvent event) {
- try {
- ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
- }
-
- @VisibleForTesting
- @Private
- @Unstable
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
- // only process watcher event from current ZooKeeper Client session.
- if (zk != activeZkClient) {
- LOG.info("Ignore watcher event type: " + event.getType() +
- " with state:" + event.getState() + " for path:" +
- event.getPath() + " from old session");
- return;
- }
-
- Event.EventType eventType = event.getType();
- LOG.info("Watcher event type: " + eventType + " with state:"
- + event.getState() + " for path:" + event.getPath() + " for " + this);
-
- if (eventType == Event.EventType.None) {
-
- // the connection state has changed
- switch (event.getState()) {
- case SyncConnected:
- LOG.info("ZKRMStateStore Session connected");
- if (zkClient == null) {
- // the SyncConnected must be from the client that sent Disconnected
- zkClient = activeZkClient;
- ZKRMStateStore.this.notifyAll();
- LOG.info("ZKRMStateStore Session restored");
- }
- break;
- case Disconnected:
- LOG.info("ZKRMStateStore Session disconnected");
- zkClient = null;
- break;
- case Expired:
- // the connection got terminated because of session timeout
- // call listener to reconnect
- LOG.info("ZKRMStateStore Session expired");
- createConnection();
- break;
- default:
- LOG.error("Unexpected Zookeeper" +
- " watch event state: " + event.getState());
- break;
- }
- }
- }
@VisibleForTesting
@Private
@@ -916,17 +806,11 @@ String getNodePath(String root, String nodeName) {
*/
private synchronized void doStoreMultiWithRetries(
final List opList) throws Exception {
- final List execOpList = new ArrayList(opList.size() + 2);
+ final List execOpList = new ArrayList<>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
+ curatorFramework.getZookeeperClient().getZooKeeper().multi(execOpList);
}
/**
@@ -943,18 +827,11 @@ private void doStoreMultiWithRetries(final Op op) throws Exception {
*/
private synchronized void doDeleteMultiWithRetries(
final List opList) throws Exception {
- final List execOpList = new ArrayList(opList.size() + 2);
+ final List execOpList = new ArrayList<>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- setHasDeleteNodeOp(true);
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
+ curatorFramework.getZookeeperClient().getZooKeeper().multi(execOpList);
}
private void doDeleteMultiWithRetries(final Op op) throws Exception {
@@ -983,70 +860,27 @@ public void setDataWithRetries(final String path, final byte[] data,
@Unstable
public byte[] getDataWithRetries(final String path, final boolean watch)
throws Exception {
- return new ZKAction() {
- @Override
- public byte[] run() throws KeeperException, InterruptedException {
- return zkClient.getData(path, watch, null);
- }
- }.runWithRetries();
+ return curatorFramework.getData().forPath(path);
}
private List getACLWithRetries(
final String path, final Stat stat) throws Exception {
- return new ZKAction>() {
- @Override
- public List run() throws KeeperException, InterruptedException {
- return zkClient.getACL(path, stat);
- }
- }.runWithRetries();
+ return curatorFramework.getACL().forPath(path);
}
private List getChildrenWithRetries(
final String path, final boolean watch) throws Exception {
- return new ZKAction>() {
- @Override
- List run() throws KeeperException, InterruptedException {
- return zkClient.getChildren(path, watch);
- }
- }.runWithRetries();
+ return curatorFramework.getChildren().forPath(path);
}
private Stat existsWithRetries(
final String path, final boolean watch) throws Exception {
- return new ZKAction() {
- @Override
- Stat run() throws KeeperException, InterruptedException {
- return zkClient.exists(path, watch);
- }
- }.runWithRetries();
+ return curatorFramework.checkExists().forPath(path);
}
private void deleteWithRetries(
final String path, final boolean watch) throws Exception {
- new ZKAction() {
- @Override
- Void run() throws KeeperException, InterruptedException {
- recursiveDeleteWithRetriesHelper(path, watch);
- return null;
- }
- }.runWithRetries();
- }
-
- /**
- * Helper method that deletes znodes recursively
- */
- private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
- throws KeeperException, InterruptedException {
- List children = zkClient.getChildren(path, watch);
- for (String child : children) {
- recursiveDeleteWithRetriesHelper(path + "/" + child, false);
- }
-
- try {
- zkClient.delete(path, -1);
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Node " + path + " doesn't exist to delete");
- }
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
}
/**
@@ -1078,118 +912,38 @@ public void run() {
}
}
- private abstract class ZKAction {
- private boolean hasDeleteNodeOp = false;
- void setHasDeleteNodeOp(boolean hasDeleteOp) {
- this.hasDeleteNodeOp = hasDeleteOp;
- }
- // run() expects synchronization on ZKRMStateStore.this
- abstract T run() throws KeeperException, InterruptedException;
-
- T runWithCheck() throws Exception {
- long startTime = System.currentTimeMillis();
- synchronized (ZKRMStateStore.this) {
- while (zkClient == null) {
- ZKRMStateStore.this.wait(zkSessionTimeout);
- if (zkClient != null) {
- break;
- }
- if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
- throw new IOException("Wait for ZKClient creation timed out");
- }
- }
- return run();
- }
- }
-
- private boolean shouldRetry(Code code) {
- switch (code) {
- case CONNECTIONLOSS:
- case OPERATIONTIMEOUT:
- case SESSIONEXPIRED:
- case SESSIONMOVED:
- return true;
- default:
- break;
- }
- return false;
- }
-
- T runWithRetries() throws Exception {
- int retry = 0;
- while (true) {
- try {
- return runWithCheck();
- } catch (KeeperException.NoAuthException nae) {
- if (HAUtil.isHAEnabled(getConfig())) {
- // NoAuthException possibly means that this store is fenced due to
- // another RM becoming active. Even if not,
- // it is safer to assume we have been fenced
- throw new StoreFencedException();
- }
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.info("znode already exists!");
- return null;
- }
- if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
- LOG.info("znode has already been deleted!");
- return null;
- }
-
- LOG.info("Exception while executing a ZK operation.", ke);
- if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Retrying operation on ZK. Retry no. " + retry);
- Thread.sleep(zkRetryInterval);
- createConnection();
- continue;
- }
- LOG.info("Maxed out ZK retries. Giving up!");
- throw ke;
- }
- }
- }
- }
-
private synchronized void createConnection()
throws IOException, InterruptedException {
- closeZkClients();
- for (int retries = 0; retries < numRetries && zkClient == null;
- retries++) {
- try {
- activeZkClient = getNewZooKeeper();
- zkClient = activeZkClient;
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
- }
- if (useDefaultFencingScheme) {
- zkClient.addAuthInfo(zkRootNodeAuthScheme,
- (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
- }
- } catch (IOException ioe) {
- // Retry in case of network failures
- LOG.info("Failed to connect to the ZooKeeper on attempt - " +
- (retries + 1));
- ioe.printStackTrace();
- }
- }
- if (zkClient == null) {
- LOG.error("Unable to connect to Zookeeper");
- throw new YarnRuntimeException("Unable to connect to Zookeeper");
- }
- ZKRMStateStore.this.notifyAll();
- LOG.info("Created new ZK connection");
- }
-
- // protected to mock for testing
- @VisibleForTesting
- @Private
- @Unstable
- protected synchronized ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
- zk.register(new ForwardingWatcher(zk));
- return zk;
+ // Curator connection
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ builder = builder.connectString(zkHostPort)
+ .connectionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(
+ new RetryNTimes(numRetries, zkSessionTimeout / numRetries));
+
+ // Set up authorization based on fencing scheme
+ List authInfos = new ArrayList<>();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+ if (useDefaultFencingScheme) {
+ byte[] defaultFencingAuth =
+ (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
+ Charset.forName("UTF-8"));
+ authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
+ }
+ builder = builder.authorization(authInfos);
+
+ // Connect to ZK
+ curatorFramework = builder.build();
+ curatorFramework.start();
+ if (!curatorFramework.blockUntilConnected(
+ zkSessionTimeout, TimeUnit.MILLISECONDS)) {
+ LOG.fatal("Couldn't establish connection to ZK server");
+ throw new YarnRuntimeException("Couldn't connect to ZK server");
+ }
+
+ LOG.info("Connected to ZooKeeper");
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 4d0e560..9e0d22b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -40,7 +40,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -76,7 +75,7 @@
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
-public class RMStateStoreTestBase extends ClientBaseWithFixes{
+public class RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 333455c..351e7f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
import javax.crypto.SecretKey;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -61,22 +57,45 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
+ private TestingServer curatorTestingServer;
+ private CuratorFramework curatorFramework;
+
+ @Before
+ public void setupCuratorServer() {
+ try {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(curatorTestingServer.getConnectString())
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+
+ } catch (Exception e) {
+ LOG.fatal("Unable to start Curator TestingServer");
+ throw new RuntimeException(e);
+ }
+ }
class TestZKRMStateStoreTester implements RMStateStoreHelper {
- ZooKeeper client;
TestZKRMStateStoreInternal store;
String workingZnode;
+
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
@@ -86,10 +105,10 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
assertTrue(znodeWorkingPath.equals(workingZnode));
}
- @Override
- public ZooKeeper getNewZooKeeper() throws IOException {
- return client;
- }
+// @Override
+// public ZooKeeper getNewZooKeeper() throws IOException {
+// return client;
+// }
public String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
@@ -117,23 +136,24 @@ public void testRetryingCreateRootDir() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
- List nodes = client.getChildren(store.znodeWorkingPath, false);
- return nodes.size() == 1;
+ return 1 ==
+ curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
}
@Override
public void writeVersion(Version version) throws Exception {
- client.setData(store.getVersionNode(), ((VersionPBImpl) version)
- .getProto().toByteArray(), -1);
+ curatorFramework.setData().withVersion(-1)
+ .forPath(store.getVersionNode(),
+ ((VersionPBImpl) version).getProto().toByteArray());
}
@Override
@@ -142,10 +162,8 @@ public Version getCurrentVersion() throws Exception {
}
public boolean appExists(RMApp app) throws Exception {
- Stat node =
- client.exists(store.getAppNode(app.getApplicationId().toString()),
- false);
- return node !=null;
+ return null != curatorFramework.checkExists()
+ .forPath(store.getAppNode(app.getApplicationId().toString()));
}
}
@@ -178,9 +196,9 @@ public Version getCurrentVersion() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@@ -217,7 +235,8 @@ private Configuration createHARMConf(
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index 654b357..e270404 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
+
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -73,10 +75,11 @@
private ZKRMStateStore store;
private AMRMTokenSecretManager appTokenMgr;
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+ private TestingServer curatorTestingServer;
@Before
public void setUpZKServer() throws Exception {
- super.setUp();
+ curatorTestingServer = new TestingServer();
}
@After
@@ -87,7 +90,7 @@ public void tearDown() throws Exception {
if (appTokenMgr != null) {
appTokenMgr.stop();
}
- super.tearDown();
+ curatorTestingServer.stop();
}
private void initStore(String hostPort) {
@@ -95,7 +98,8 @@ private void initStore(String hostPort) {
RMContext rmContext = mock(RMContext.class);
conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ optHostPort.or(curatorTestingServer.getConnectString()));
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
@@ -140,7 +144,7 @@ public int run(String[] args) {
if (launchLocalZK) {
try {
- setUp();
+ setUpZKServer();
} catch (Exception e) {
System.err.println("failed to setup. : " + e.getMessage());
return -1;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
deleted file mode 100644
index 62dc5ef..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
-import org.apache.hadoop.util.ZKUtil;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestZKRMStateStoreZKClientConnections extends
- ClientBaseWithFixes {
-
- private static final int ZK_OP_WAIT_TIME = 3000;
- private static final int ZK_TIMEOUT_MS = 1000;
- private Log LOG =
- LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
-
- private static final String DIGEST_USER_PASS="test-user:test-password";
- private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
- private static final String DIGEST_USER_HASH;
- static {
- try {
- DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
- DIGEST_USER_PASS);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
- private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
-
-
- class TestZKClient {
-
- ZKRMStateStore store;
- boolean forExpire = false;
- TestForwardingWatcher oldWatcher;
- TestForwardingWatcher watcher;
- CyclicBarrier syncBarrier = new CyclicBarrier(2);
-
- protected class TestZKRMStateStore extends ZKRMStateStore {
-
- public TestZKRMStateStore(Configuration conf, String workingZnode)
- throws Exception {
- init(conf);
- start();
- assertTrue(znodeWorkingPath.equals(workingZnode));
- }
-
- @Override
- public ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- oldWatcher = watcher;
- watcher = new TestForwardingWatcher();
- return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
- }
-
- @Override
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
-
- if (forExpire) {
- // a hack... couldn't find a way to trigger expired event.
- WatchedEvent expriredEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Expired, null);
- super.processWatchEvent(zk, expriredEvent);
- forExpire = false;
- syncBarrier.await();
- } else {
- super.processWatchEvent(zk, event);
- }
- }
- }
-
- private class TestForwardingWatcher extends
- ClientBaseWithFixes.CountdownWatcher {
- public void process(WatchedEvent event) {
- super.process(event);
- try {
- if (store != null) {
- store.processWatchEvent(client, event);
- }
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
- }
-
- public RMStateStore getRMStateStore(Configuration conf) throws Exception {
- String workingZnode = "/Test";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
- conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.store = new TestZKRMStateStore(conf, workingZnode);
- return this.store;
- }
- }
-
- @Test (timeout = 20000)
- public void testZKClientRetry() throws Exception {
- TestZKClient zkClientTester = new TestZKClient();
- final String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- conf.setLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, 100);
- final ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
- final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
-
- stopServer();
- Thread clientThread = new Thread() {
- @Override
- public void run() {
- try {
- store.getDataWithRetries(path, true);
- } catch (Exception e) {
- e.printStackTrace();
- assertionFailedInThread.set(true);
- }
- }
- };
- Thread.sleep(2000);
- startServer();
- clientThread.join();
- Assert.assertFalse(assertionFailedInThread.get());
- }
-
- @Test(timeout = 20000)
- public void testZKClientDisconnectAndReconnect()
- throws Exception {
-
- TestZKClient zkClientTester = new TestZKClient();
- String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "newBytes".getBytes(), 0);
-
- stopServer();
- zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
- try {
- store.getDataWithRetries(path, true);
- fail("Expected ZKClient time out exception");
- } catch (Exception e) {
- assertTrue(e.getMessage().contains(
- "Wait for ZKClient creation timed out"));
- }
-
- // ZKRMStateStore Session restored
- startServer();
- zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
- byte[] ret = null;
- try {
- ret = store.getDataWithRetries(path, true);
- } catch (Exception e) {
- String error = "ZKRMStateStore Session restore failed";
- LOG.error(error, e);
- fail(error);
- }
- assertEquals("newBytes", new String(ret));
- }
-
- @Test(timeout = 20000)
- public void testZKSessionTimeout() throws Exception {
-
- TestZKClient zkClientTester = new TestZKClient();
- String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
-
- // a hack to trigger expired event
- zkClientTester.forExpire = true;
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "bytes".getBytes(), 0);
-
- zkClientTester.syncBarrier.await();
- // after this point, expired event has already been processed.
-
- try {
- byte[] ret = store.getDataWithRetries(path, false);
- assertEquals("bytes", new String(ret));
- } catch (Exception e) {
- String error = "New session creation failed";
- LOG.error(error, e);
- fail(error);
- }
-
- // send Disconnected event from old client session to ZKRMStateStore
- // check the current client session is not affected.
- Assert.assertTrue(zkClientTester.oldWatcher != null);
- WatchedEvent disconnectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Disconnected, null);
- zkClientTester.oldWatcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient != null);
-
- zkClientTester.watcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient == null);
- WatchedEvent connectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.SyncConnected, null);
- zkClientTester.watcher.process(connectedEvent);
- Assert.assertTrue(store.zkClient != null);
- Assert.assertTrue(store.zkClient == store.activeZkClient);
- }
-
- @Test(timeout = 20000)
- public void testSetZKAcl() {
- TestZKClient zkClientTester = new TestZKClient();
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
- try {
- zkClientTester.store.zkClient.delete(zkClientTester.store
- .znodeWorkingPath, -1);
- fail("Shouldn't be able to delete path");
- } catch (Exception e) {/* expected behavior */
- }
- }
-
- @Test(timeout = 20000)
- public void testInvalidZKAclConfiguration() {
- TestZKClient zkClientTester = new TestZKClient();
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
- try {
- zkClientTester.getRMStateStore(conf);
- fail("ZKRMStateStore created with bad ACL");
- } catch (ZKUtil.BadAclFormatException bafe) {
- // expected behavior
- } catch (Exception e) {
- String error = "Incorrect exception on BadAclFormat";
- LOG.error(error, e);
- fail(error);
- }
- }
-
- @Test
- public void testZKAuths() throws Exception {
- TestZKClient zkClientTester = new TestZKClient();
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1);
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL);
- conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD);
-
- zkClientTester.getRMStateStore(conf);
- }
-
- @Test
- public void testZKRetryInterval() throws Exception {
- TestZKClient zkClientTester = new TestZKClient();
- YarnConfiguration conf = new YarnConfiguration();
-
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
- store.zkRetryInterval);
- store.stop();
-
- conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
- store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
- store.zkRetryInterval);
- store.stop();
- }
-}