diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 76d280a..4960f95 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -175,6 +175,14 @@
hadoop-yarn-server-web-proxy
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-test
+
+
org.apache.zookeeper
zookeeper
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 15ac971..7e8689f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -22,22 +22,26 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,14 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
@@ -152,20 +149,12 @@
protected String znodeWorkingPath;
@VisibleForTesting
- protected ZooKeeper zkClient;
-
- /* activeZkClient is not used to do actual operations,
- * it is only used to verify client session for watched events and
- * it gets activated into zkClient on connection event.
- */
- @VisibleForTesting
- ZooKeeper activeZkClient;
+ protected CuratorFramework curatorFramework;
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private String fencingNodePath;
- private Op createFencingNodePathOp;
- private Op deleteFencingNodePathOp;
+
private Thread verifyActiveStatusThread;
private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
@@ -192,7 +181,7 @@
@Unstable
protected List constructZkRootNodeACL(
Configuration conf, List sourceACLs) throws NoSuchAlgorithmException {
- List zkRootNodeAcl = new ArrayList();
+ List zkRootNodeAcl = new ArrayList<>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
@@ -243,9 +232,6 @@ public synchronized void initInternal(Configuration conf) throws Exception {
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
- createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
- CreateMode.PERSISTENT);
- deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@@ -283,42 +269,32 @@ public synchronized void startInternal() throws Exception {
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
- createRootDir(zkRootNodePath);
+ create(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
- createRootDir(rmAppRoot);
- createRootDir(rmDTSecretManagerRoot);
- createRootDir(dtMasterKeysRootPath);
- createRootDir(delegationTokensRootPath);
- createRootDir(dtSequenceNumberPath);
- createRootDir(amrmTokenSecretManagerRoot);
+ create(rmAppRoot);
+ create(rmDTSecretManagerRoot);
+ create(dtMasterKeysRootPath);
+ create(delegationTokensRootPath);
+ create(dtSequenceNumberPath);
+ create(amrmTokenSecretManagerRoot);
}
- protected void createRootDir(final String rootPath) throws Exception {
- // For root dirs, we shouldn't use the doMulti helper methods
- new ZKAction() {
- @Override
- public String run() throws KeeperException, InterruptedException {
- try {
- return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.debug(rootPath + "znode already exists!");
- return null;
- } else {
- throw ke;
- }
- }
- }
- }.runWithRetries();
+ @VisibleForTesting
+ void create(final String path) throws Exception {
+ if (!exists(path)) {
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(path, null);
+ }
}
private void logRootNodeAcls(String prefix) throws Exception {
Stat getStat = new Stat();
- List getAcls = getACLWithRetries(zkRootNodePath, getStat);
+ List getAcls = getACL(zkRootNodePath);
StringBuilder builder = new StringBuilder();
builder.append(prefix);
@@ -334,51 +310,21 @@ private synchronized void fence() throws Exception {
logRootNodeAcls("Before fencing\n");
}
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
- return null;
- }
- }.runWithRetries();
-
- // delete fencingnodepath
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- try {
- zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
- }
- return null;
- }
- }.runWithRetries();
+ curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
+ delete(fencingNodePath);
if (LOG.isTraceEnabled()) {
logRootNodeAcls("After fencing\n");
}
}
- private synchronized void closeZkClients() throws IOException {
- zkClient = null;
- if (activeZkClient != null) {
- try {
- activeZkClient.close();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while closing ZK", e);
- }
- activeZkClient = null;
- }
- }
-
@Override
protected synchronized void closeInternal() throws Exception {
if (verifyActiveStatusThread != null) {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
- closeZkClients();
+ curatorFramework.close();
}
@Override
@@ -391,10 +337,10 @@ protected synchronized void storeVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
- if (existsWithRetries(versionNodePath, false) != null) {
- setDataWithRetries(versionNodePath, data, -1);
+ if (exists(versionNodePath)) {
+ safeSetData(versionNodePath, data, -1);
} else {
- createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+ safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
}
}
@@ -402,11 +348,9 @@ protected synchronized void storeVersion() throws Exception {
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
- if (existsWithRetries(versionNodePath, false) != null) {
- byte[] data = getDataWithRetries(versionNodePath, false);
- Version version =
- new VersionPBImpl(VersionProto.parseFrom(data));
- return version;
+ if (exists(versionNodePath)) {
+ byte[] data = getData(versionNodePath);
+ return new VersionPBImpl(VersionProto.parseFrom(data));
}
return null;
}
@@ -415,20 +359,20 @@ protected synchronized Version loadVersion() throws Exception {
public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = 0;
- if (existsWithRetries(epochNodePath, false) != null) {
+ if (exists(epochNodePath)) {
// load current epoch
- byte[] data = getDataWithRetries(epochNodePath, false);
+ byte[] data = getData(epochNodePath);
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- setDataWithRetries(epochNodePath, storeData, -1);
+ safeSetData(epochNodePath, storeData, -1);
} else {
// initialize epoch node with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+ safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
}
return currentEpoch;
}
@@ -447,7 +391,7 @@ public synchronized RMState loadState() throws Exception {
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
- byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
+ byte[] data = getData(amrmTokenSecretManagerRoot);
if (data == null) {
LOG.warn("There is no data saved");
return;
@@ -458,7 +402,6 @@ private void loadAMRMTokenSecretManagerState(RMState rmState)
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
-
}
private synchronized void loadRMDTSecretManagerState(RMState rmState)
@@ -470,10 +413,10 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState)
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List childNodes =
- getChildrenWithRetries(dtMasterKeysRootPath, false);
+ getChildren(dtMasterKeysRootPath);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -500,7 +443,7 @@ private void loadRMDelegationKeyState(RMState rmState) throws Exception {
}
private void loadRMSequentialNumberState(RMState rmState) throws Exception {
- byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+ byte[] seqData = getData(dtSequenceNumberPath);
if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
DataInputStream seqIn = new DataInputStream(seqIs);
@@ -515,11 +458,11 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception {
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List childNodes =
- getChildrenWithRetries(delegationTokensRootPath, false);
+ getChildren(delegationTokensRootPath);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -551,10 +494,10 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List childNodes = getChildrenWithRetries(rmAppRoot, false);
+ List childNodes = getChildren(rmAppRoot);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
@@ -581,11 +524,11 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
- List attempts = getChildrenWithRetries(appPath, false);
+ List attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
- byte[] attemptData = getDataWithRetries(attemptPath, false);
+ byte[] attemptData = getData(attemptPath);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
@@ -606,8 +549,8 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- createWithRetries(nodeCreatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeCreatePath, appStateData, zkAcl,
+ CreateMode.PERSISTENT);
}
@@ -622,11 +565,11 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId,
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- if (existsWithRetries(nodeUpdatePath, false) != null) {
- setDataWithRetries(nodeUpdatePath, appStateData, -1);
+ if (exists(nodeUpdatePath)) {
+ safeSetData(nodeUpdatePath, appStateData, -1);
} else {
- createWithRetries(nodeUpdatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeUpdatePath, appStateData, zkAcl,
+ CreateMode.PERSISTENT);
LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
@@ -646,8 +589,8 @@ public synchronized void storeApplicationAttemptStateInternal(
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeCreatePath, attemptStateData, zkAcl,
+ CreateMode.PERSISTENT);
}
@Override
@@ -665,11 +608,11 @@ public synchronized void updateApplicationAttemptStateInternal(
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- if (existsWithRetries(nodeUpdatePath, false) != null) {
- setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
+ if (exists(nodeUpdatePath)) {
+ safeSetData(nodeUpdatePath, attemptStateData, -1);
} else {
- createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
+ CreateMode.PERSISTENT);
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
@@ -682,28 +625,23 @@ public synchronized void removeApplicationStateInternal(
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
- ArrayList opList = new ArrayList();
-
- for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
- opList.add(Op.delete(attemptRemovePath, -1));
- }
- opList.add(Op.delete(appIdRemovePath, -1));
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
- doDeleteMultiWithRetries(opList);
+
+ // TODO: Check deleting appIdRemovePath works recursively
+ safeDelete(appIdRemovePath);
}
@Override
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList opList = new ArrayList();
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
- doStoreMultiWithRetries(opList);
+ SafeTransaction trx = new SafeTransaction();
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
+ trx.commit();
}
@Override
@@ -716,35 +654,29 @@ protected synchronized void removeRMDelegationTokenState(
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
- if (existsWithRetries(nodeRemovePath, false) != null) {
- ArrayList opList = new ArrayList();
- opList.add(Op.delete(nodeRemovePath, -1));
- doDeleteMultiWithRetries(opList);
- } else {
- LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
- }
+ safeDelete(nodeRemovePath);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList opList = new ArrayList();
+ SafeTransaction trx = new SafeTransaction();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
- if (existsWithRetries(nodeRemovePath, false) == null) {
+ if (exists(nodeRemovePath)) {
+ // in case znode exists
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
+ } else {
// in case znode doesn't exist
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
- } else {
- // in case znode exists
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
}
- doStoreMultiWithRetries(opList);
+ trx.commit();
}
- private void addStoreOrUpdateOps(ArrayList opList,
+ private void addStoreOrUpdateOps(SafeTransaction trx,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
@@ -762,18 +694,18 @@ private void addStoreOrUpdateOps(ArrayList opList,
}
if (isUpdate) {
- opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
+ trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
} else {
- opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
- CreateMode.PERSISTENT));
+ trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT);
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") +
- dtSequenceNumberPath + ". SequenceNumber: "
- + rmDTIdentifier.getSequenceNumber());
+ dtSequenceNumberPath + ". SequenceNumber: "
+ + rmDTIdentifier.getSequenceNumber());
}
- opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+ trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
}
} finally {
seqOs.close();
@@ -793,7 +725,7 @@ protected synchronized void storeRMDTMasterKeyState(
}
delegationKey.write(fsOut);
try {
- createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
+ safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
} finally {
os.close();
@@ -809,243 +741,116 @@ protected synchronized void removeRMDTMasterKeyState(
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
- if (existsWithRetries(nodeRemovePath, false) != null) {
- doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
- } else {
- LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
- }
+ safeDelete(nodeRemovePath);
}
@Override
public synchronized void deleteStore() throws Exception {
- if (existsWithRetries(zkRootNodePath, false) != null) {
- deleteWithRetries(zkRootNodePath, false);
- }
+ delete(zkRootNodePath);
}
@Override
public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception {
String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
- if (existsWithRetries(appIdRemovePath, false) != null) {
- deleteWithRetries(appIdRemovePath, false);
- }
+ delete(appIdRemovePath);
}
- // ZK related code
- /**
- * Watcher implementation which forward events to the ZKRMStateStore This
- * hides the ZK methods of the store from its public interface
- */
- private final class ForwardingWatcher implements Watcher {
- private ZooKeeper watchedZkClient;
-
- public ForwardingWatcher(ZooKeeper client) {
- this.watchedZkClient = client;
- }
-
- @Override
- public void process(WatchedEvent event) {
- try {
- ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
- }
@VisibleForTesting
- @Private
- @Unstable
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
- // only process watcher event from current ZooKeeper Client session.
- if (zk != activeZkClient) {
- LOG.info("Ignore watcher event type: " + event.getType() +
- " with state:" + event.getState() + " for path:" +
- event.getPath() + " from old session");
- return;
- }
-
- Event.EventType eventType = event.getType();
- LOG.info("Watcher event type: " + eventType + " with state:"
- + event.getState() + " for path:" + event.getPath() + " for " + this);
-
- if (eventType == Event.EventType.None) {
-
- // the connection state has changed
- switch (event.getState()) {
- case SyncConnected:
- LOG.info("ZKRMStateStore Session connected");
- if (zkClient == null) {
- // the SyncConnected must be from the client that sent Disconnected
- zkClient = activeZkClient;
- ZKRMStateStore.this.notifyAll();
- LOG.info("ZKRMStateStore Session restored");
- }
- break;
- case Disconnected:
- LOG.info("ZKRMStateStore Session disconnected");
- zkClient = null;
- break;
- case Expired:
- // the connection got terminated because of session timeout
- // call listener to reconnect
- LOG.info("ZKRMStateStore Session expired");
- createConnection();
- break;
- default:
- LOG.error("Unexpected Zookeeper" +
- " watch event state: " + event.getState());
- break;
- }
- }
- }
-
- @VisibleForTesting
- @Private
- @Unstable
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
/**
- * Helper method that creates fencing node, executes the passed operations,
- * and deletes the fencing node.
+ * Use curator transactions to ensure zk-operations are performed in an all
+ * or nothing fashion. This is equivalent to using ZooKeeper#multi.
+ *
+ * TODO: Curator 3.0 introduces CuratorOp similar to Op. We ll have to
+ * rewrite this inner class when we adopt that.
*/
- private synchronized void doStoreMultiWithRetries(
- final List opList) throws Exception {
- final List execOpList = new ArrayList(opList.size() + 2);
- execOpList.add(createFencingNodePathOp);
- execOpList.addAll(opList);
- execOpList.add(deleteFencingNodePathOp);
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
- }
+ private class SafeTransaction {
+ private CuratorTransactionFinal transactionFinal;
+ private boolean create = false, delete = false;
- /**
- * Helper method that creates fencing node, executes the passed operation,
- * and deletes the fencing node.
- */
- private void doStoreMultiWithRetries(final Op op) throws Exception {
- doStoreMultiWithRetries(Collections.singletonList(op));
- }
+ SafeTransaction() throws Exception {
+ CuratorTransaction transaction = curatorFramework.inTransaction();
+ transactionFinal =
+ transaction.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(fencingNodePath, new byte[0]).and();
+ }
- /**
- * Helper method that creates fencing node, executes the passed
- * delete related operations and deletes the fencing node.
- */
- private synchronized void doDeleteMultiWithRetries(
- final List opList) throws Exception {
- final List execOpList = new ArrayList(opList.size() + 2);
- execOpList.add(createFencingNodePathOp);
- execOpList.addAll(opList);
- execOpList.add(deleteFencingNodePathOp);
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- setHasDeleteNodeOp(true);
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
- }
+ public void commit() throws Exception {
+ transactionFinal = transactionFinal.delete()
+ .forPath(fencingNodePath).and();
+ transactionFinal.commit();
+ }
- private void doDeleteMultiWithRetries(final Op op) throws Exception {
- doDeleteMultiWithRetries(Collections.singletonList(op));
+ public void create(String path, byte[] data, List acl, CreateMode mode)
+ throws Exception {
+ transactionFinal = transactionFinal.create()
+ .withMode(mode).withACL(acl).forPath(path, data).and();
+ create = true;
+ }
+
+ public void delete(String path) throws Exception {
+ transactionFinal = transactionFinal.delete().forPath(path).and();
+ delete = true;
+ }
+
+ public void setData(String path, byte[] data, int version)
+ throws Exception {
+ transactionFinal = transactionFinal.setData()
+ .withVersion(version).forPath(path, data).and();
+ }
}
- @VisibleForTesting
- @Private
- @Unstable
- public void createWithRetries(
- final String path, final byte[] data, final List acl,
- final CreateMode mode) throws Exception {
- doStoreMultiWithRetries(Op.create(path, data, acl, mode));
+ private void safeCreate(String path, byte[] data, List acl,
+ CreateMode mode) throws Exception {
+ if (!exists(path)) {
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.create(path, data, acl, mode);
+ transaction.commit();
+ }
}
- @VisibleForTesting
- @Private
- @Unstable
- public void setDataWithRetries(final String path, final byte[] data,
- final int version) throws Exception {
- doStoreMultiWithRetries(Op.setData(path, data, version));
+ private void safeDelete(final String path) throws Exception {
+ if (exists(path)) {
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.delete(path);
+ transaction.commit();
+ }
}
- @VisibleForTesting
- @Private
- @Unstable
- public byte[] getDataWithRetries(final String path, final boolean watch)
+ private void safeSetData(String path, byte[] data, int version)
throws Exception {
- return new ZKAction() {
- @Override
- public byte[] run() throws KeeperException, InterruptedException {
- return zkClient.getData(path, watch, null);
- }
- }.runWithRetries();
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.setData(path, data, version);
+ transaction.commit();
}
- private List getACLWithRetries(
- final String path, final Stat stat) throws Exception {
- return new ZKAction>() {
- @Override
- public List run() throws KeeperException, InterruptedException {
- return zkClient.getACL(path, stat);
- }
- }.runWithRetries();
+ @VisibleForTesting
+ byte[] getData(final String path) throws Exception {
+ return curatorFramework.getData().forPath(path);
}
- private List getChildrenWithRetries(
- final String path, final boolean watch) throws Exception {
- return new ZKAction>() {
- @Override
- List run() throws KeeperException, InterruptedException {
- return zkClient.getChildren(path, watch);
- }
- }.runWithRetries();
+ private List getACL(final String path) throws Exception {
+ return curatorFramework.getACL().forPath(path);
}
- private Stat existsWithRetries(
- final String path, final boolean watch) throws Exception {
- return new ZKAction() {
- @Override
- Stat run() throws KeeperException, InterruptedException {
- return zkClient.exists(path, watch);
- }
- }.runWithRetries();
+ private List getChildren(final String path) throws Exception {
+ return curatorFramework.getChildren().forPath(path);
}
- private void deleteWithRetries(
- final String path, final boolean watch) throws Exception {
- new ZKAction() {
- @Override
- Void run() throws KeeperException, InterruptedException {
- recursiveDeleteWithRetriesHelper(path, watch);
- return null;
- }
- }.runWithRetries();
+ private boolean exists(final String path) throws Exception {
+ return curatorFramework.checkExists().forPath(path) != null;
}
- /**
- * Helper method that deletes znodes recursively
- */
- private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
- throws KeeperException, InterruptedException {
- List children = zkClient.getChildren(path, watch);
- for (String child : children) {
- recursiveDeleteWithRetriesHelper(path + "/" + child, false);
- }
-
- try {
- zkClient.delete(path, -1);
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Node " + path + " doesn't exist to delete");
+ @VisibleForTesting
+ void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
}
}
@@ -1054,8 +859,6 @@ private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
* this RM continues to be the Active.
*/
private class VerifyActiveStatusThread extends Thread {
- private List emptyOpList = new ArrayList();
-
VerifyActiveStatusThread() {
super(VerifyActiveStatusThread.class.getName());
}
@@ -1066,7 +869,8 @@ public void run() {
if(isFencedState()) {
break;
}
- doStoreMultiWithRetries(emptyOpList);
+ // Create and delete fencing node
+ new SafeTransaction().commit();
Thread.sleep(zkSessionTimeout);
}
} catch (InterruptedException ie) {
@@ -1078,118 +882,37 @@ public void run() {
}
}
- private abstract class ZKAction {
- private boolean hasDeleteNodeOp = false;
- void setHasDeleteNodeOp(boolean hasDeleteOp) {
- this.hasDeleteNodeOp = hasDeleteOp;
- }
- // run() expects synchronization on ZKRMStateStore.this
- abstract T run() throws KeeperException, InterruptedException;
-
- T runWithCheck() throws Exception {
- long startTime = System.currentTimeMillis();
- synchronized (ZKRMStateStore.this) {
- while (zkClient == null) {
- ZKRMStateStore.this.wait(zkSessionTimeout);
- if (zkClient != null) {
- break;
- }
- if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
- throw new IOException("Wait for ZKClient creation timed out");
- }
- }
- return run();
- }
- }
+ private void createConnection() throws Exception {
+ // Curator connection
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ builder = builder.connectString(zkHostPort)
+ .connectionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(
+ new RetryNTimes(numRetries, zkSessionTimeout / numRetries));
- private boolean shouldRetry(Code code) {
- switch (code) {
- case CONNECTIONLOSS:
- case OPERATIONTIMEOUT:
- case SESSIONEXPIRED:
- case SESSIONMOVED:
- return true;
- default:
- break;
- }
- return false;
+ // Set up authorization based on fencing scheme
+ List authInfos = new ArrayList<>();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
-
- T runWithRetries() throws Exception {
- int retry = 0;
- while (true) {
- try {
- return runWithCheck();
- } catch (KeeperException.NoAuthException nae) {
- if (HAUtil.isHAEnabled(getConfig())) {
- // NoAuthException possibly means that this store is fenced due to
- // another RM becoming active. Even if not,
- // it is safer to assume we have been fenced
- throw new StoreFencedException();
- }
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.info("znode already exists!");
- return null;
- }
- if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
- LOG.info("znode has already been deleted!");
- return null;
- }
-
- LOG.info("Exception while executing a ZK operation.", ke);
- if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Retrying operation on ZK. Retry no. " + retry);
- Thread.sleep(zkRetryInterval);
- createConnection();
- continue;
- }
- LOG.info("Maxed out ZK retries. Giving up!");
- throw ke;
- }
- }
+ if (useDefaultFencingScheme) {
+ byte[] defaultFencingAuth =
+ (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
+ Charset.forName("UTF-8"));
+ authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
}
- }
+ builder = builder.authorization(authInfos);
- private synchronized void createConnection()
- throws IOException, InterruptedException {
- closeZkClients();
- for (int retries = 0; retries < numRetries && zkClient == null;
- retries++) {
- try {
- activeZkClient = getNewZooKeeper();
- zkClient = activeZkClient;
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
- }
- if (useDefaultFencingScheme) {
- zkClient.addAuthInfo(zkRootNodeAuthScheme,
- (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
- }
- } catch (IOException ioe) {
- // Retry in case of network failures
- LOG.info("Failed to connect to the ZooKeeper on attempt - " +
- (retries + 1));
- ioe.printStackTrace();
- }
- }
- if (zkClient == null) {
- LOG.error("Unable to connect to Zookeeper");
- throw new YarnRuntimeException("Unable to connect to Zookeeper");
+ // Connect to ZK
+ curatorFramework = builder.build();
+ curatorFramework.start();
+ if (!curatorFramework.blockUntilConnected(
+ zkSessionTimeout, TimeUnit.MILLISECONDS)) {
+ LOG.fatal("Couldn't establish connection to ZK server");
+ throw new YarnRuntimeException("Couldn't connect to ZK server");
}
- ZKRMStateStore.this.notifyAll();
- LOG.info("Created new ZK connection");
- }
- // protected to mock for testing
- @VisibleForTesting
- @Private
- @Unstable
- protected synchronized ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
- zk.register(new ForwardingWatcher(zk));
- return zk;
+ LOG.info("Connected to ZooKeeper");
}
@Override
@@ -1199,7 +922,7 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState data =
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
- setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
+ safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
}
/**
@@ -1213,7 +936,7 @@ private void createRootDirRecursively(String path) throws Exception {
StringBuilder sb = new StringBuilder();
for (int i = 1; i < pathParts.length; i++) {
sb.append("/").append(pathParts[i]);
- createRootDir(sb.toString());
+ create(sb.toString());
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 4d0e560..9e0d22b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -40,7 +40,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -76,7 +75,7 @@
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
-public class RMStateStoreTestBase extends ClientBaseWithFixes{
+public class RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 333455c..974007c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -18,18 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.crypto.SecretKey;
-
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -50,6 +42,7 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -61,22 +54,51 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import javax.crypto.SecretKey;
+
+
+
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
+ private TestingServer curatorTestingServer;
+ private CuratorFramework curatorFramework;
+
+ @Before
+ public void setupCuratorServer() throws Exception {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(curatorTestingServer.getConnectString())
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+ }
+
+ @After
+ public void cleanupCuratorServer() throws IOException {
+ curatorFramework.close();
+ curatorTestingServer.stop();
+ }
class TestZKRMStateStoreTester implements RMStateStoreHelper {
- ZooKeeper client;
TestZKRMStateStoreInternal store;
String workingZnode;
+
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
@@ -86,10 +108,10 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
assertTrue(znodeWorkingPath.equals(workingZnode));
}
- @Override
- public ZooKeeper getNewZooKeeper() throws IOException {
- return client;
- }
+// @Override
+// public ZooKeeper getNewZooKeeper() throws IOException {
+// return client;
+// }
public String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
@@ -109,7 +131,7 @@ public String getAppNode(String appId) {
* @throws Exception
*/
public void testRetryingCreateRootDir() throws Exception {
- createRootDir(znodeWorkingPath);
+ create(znodeWorkingPath);
}
}
@@ -117,23 +139,24 @@ public void testRetryingCreateRootDir() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
- List nodes = client.getChildren(store.znodeWorkingPath, false);
- return nodes.size() == 1;
+ return 1 ==
+ curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
}
@Override
public void writeVersion(Version version) throws Exception {
- client.setData(store.getVersionNode(), ((VersionPBImpl) version)
- .getProto().toByteArray(), -1);
+ curatorFramework.setData().withVersion(-1)
+ .forPath(store.getVersionNode(),
+ ((VersionPBImpl) version).getProto().toByteArray());
}
@Override
@@ -142,10 +165,8 @@ public Version getCurrentVersion() throws Exception {
}
public boolean appExists(RMApp app) throws Exception {
- Stat node =
- client.exists(store.getAppNode(app.getApplicationId().toString()),
- false);
- return node !=null;
+ return null != curatorFramework.checkExists()
+ .forPath(store.getAppNode(app.getApplicationId().toString()));
}
}
@@ -178,9 +199,9 @@ public Version getCurrentVersion() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@@ -217,7 +238,8 @@ private Configuration createHARMConf(
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index 654b357..e270404 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
+
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -73,10 +75,11 @@
private ZKRMStateStore store;
private AMRMTokenSecretManager appTokenMgr;
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+ private TestingServer curatorTestingServer;
@Before
public void setUpZKServer() throws Exception {
- super.setUp();
+ curatorTestingServer = new TestingServer();
}
@After
@@ -87,7 +90,7 @@ public void tearDown() throws Exception {
if (appTokenMgr != null) {
appTokenMgr.stop();
}
- super.tearDown();
+ curatorTestingServer.stop();
}
private void initStore(String hostPort) {
@@ -95,7 +98,8 @@ private void initStore(String hostPort) {
RMContext rmContext = mock(RMContext.class);
conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ optHostPort.or(curatorTestingServer.getConnectString()));
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
@@ -140,7 +144,7 @@ public int run(String[] args) {
if (launchLocalZK) {
try {
- setUp();
+ setUpZKServer();
} catch (Exception e) {
System.err.println("failed to setup. : " + e.getMessage());
return -1;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 62dc5ef..8db93e5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -20,39 +20,32 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TestZKRMStateStoreZKClientConnections extends
- ClientBaseWithFixes {
-
- private static final int ZK_OP_WAIT_TIME = 3000;
- private static final int ZK_TIMEOUT_MS = 1000;
+public class TestZKRMStateStoreZKClientConnections {
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
+ private static final int ZK_TIMEOUT_MS = 1000;
private static final String DIGEST_USER_PASS="test-user:test-password";
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
private static final String DIGEST_USER_HASH;
@@ -66,14 +59,22 @@
}
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
+ private TestingServer testingServer;
+
+ @Before
+ public void setupZKServer() throws Exception {
+ testingServer = new TestingServer();
+ testingServer.start();
+ }
+
+ @After
+ public void cleanupZKServer() throws Exception {
+ testingServer.stop();
+ }
class TestZKClient {
ZKRMStateStore store;
- boolean forExpire = false;
- TestForwardingWatcher oldWatcher;
- TestForwardingWatcher watcher;
- CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
@@ -83,51 +84,12 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
-
- @Override
- public ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- oldWatcher = watcher;
- watcher = new TestForwardingWatcher();
- return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
- }
-
- @Override
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
-
- if (forExpire) {
- // a hack... couldn't find a way to trigger expired event.
- WatchedEvent expriredEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Expired, null);
- super.processWatchEvent(zk, expriredEvent);
- forExpire = false;
- syncBarrier.await();
- } else {
- super.processWatchEvent(zk, event);
- }
- }
- }
-
- private class TestForwardingWatcher extends
- ClientBaseWithFixes.CountdownWatcher {
- public void process(WatchedEvent event) {
- super.process(event);
- try {
- if (store != null) {
- store.processWatchEvent(client, event);
- }
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ testingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
@@ -147,12 +109,12 @@ public void testZKClientRetry() throws Exception {
store.setRMDispatcher(dispatcher);
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
- stopServer();
+ testingServer.stop();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
- store.getDataWithRetries(path, true);
+ store.getData(path);
} catch (Exception e) {
e.printStackTrace();
assertionFailedInThread.set(true);
@@ -160,55 +122,12 @@ public void run() {
}
};
Thread.sleep(2000);
- startServer();
+ testingServer.start();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
}
@Test(timeout = 20000)
- public void testZKClientDisconnectAndReconnect()
- throws Exception {
-
- TestZKClient zkClientTester = new TestZKClient();
- String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "newBytes".getBytes(), 0);
-
- stopServer();
- zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
- try {
- store.getDataWithRetries(path, true);
- fail("Expected ZKClient time out exception");
- } catch (Exception e) {
- assertTrue(e.getMessage().contains(
- "Wait for ZKClient creation timed out"));
- }
-
- // ZKRMStateStore Session restored
- startServer();
- zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
- byte[] ret = null;
- try {
- ret = store.getDataWithRetries(path, true);
- } catch (Exception e) {
- String error = "ZKRMStateStore Session restore failed";
- LOG.error(error, e);
- fail(error);
- }
- assertEquals("newBytes", new String(ret));
- }
-
- @Test(timeout = 20000)
public void testZKSessionTimeout() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
@@ -220,44 +139,24 @@ public void testZKSessionTimeout() throws Exception {
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
- // a hack to trigger expired event
- zkClientTester.forExpire = true;
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "bytes".getBytes(), 0);
+ store.curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT)
+ .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+ .forPath(path, null);
+ store.curatorFramework.getData().forPath(path);
+ store.curatorFramework.setData()
+ .withVersion(0).forPath(path, "newBytes".getBytes());
- zkClientTester.syncBarrier.await();
- // after this point, expired event has already been processed.
+ Thread.sleep(ZK_TIMEOUT_MS);
try {
- byte[] ret = store.getDataWithRetries(path, false);
- assertEquals("bytes", new String(ret));
+ byte[] ret = store.getData(path);
+ assertEquals("newBytes", new String(ret));
} catch (Exception e) {
String error = "New session creation failed";
LOG.error(error, e);
fail(error);
}
-
- // send Disconnected event from old client session to ZKRMStateStore
- // check the current client session is not affected.
- Assert.assertTrue(zkClientTester.oldWatcher != null);
- WatchedEvent disconnectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Disconnected, null);
- zkClientTester.oldWatcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient != null);
-
- zkClientTester.watcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient == null);
- WatchedEvent connectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.SyncConnected, null);
- zkClientTester.watcher.process(connectedEvent);
- Assert.assertTrue(store.zkClient != null);
- Assert.assertTrue(store.zkClient == store.activeZkClient);
}
@Test(timeout = 20000)
@@ -266,8 +165,8 @@ public void testSetZKAcl() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
try {
- zkClientTester.store.zkClient.delete(zkClientTester.store
- .znodeWorkingPath, -1);
+ zkClientTester.store.delete(zkClientTester.store
+ .znodeWorkingPath);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */
}