diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 76d280a..4960f95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -175,6 +175,14 @@ hadoop-yarn-server-web-proxy + org.apache.curator + curator-client + + + org.apache.curator + curator-test + + org.apache.zookeeper zookeeper diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 15ac971..1d7be58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -22,22 +22,26 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.IOException; import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.retry.RetryNTimes; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -64,14 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; @@ -152,20 +149,12 @@ protected String znodeWorkingPath; @VisibleForTesting - protected ZooKeeper zkClient; - - /* activeZkClient is not used to do actual operations, - * it is only used to verify client session for watched events and - * it gets activated into zkClient on connection event. - */ - @VisibleForTesting - ZooKeeper activeZkClient; + protected CuratorFramework curatorFramework; /** Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private String fencingNodePath; - private Op createFencingNodePathOp; - private Op deleteFencingNodePathOp; + private Thread verifyActiveStatusThread; private String zkRootNodeUsername; private final String zkRootNodePassword = Long.toString(random.nextLong()); @@ -192,7 +181,7 @@ @Unstable protected List constructZkRootNodeACL( Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { - List zkRootNodeAcl = new ArrayList(); + List zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), @@ -243,9 +232,6 @@ public synchronized void initInternal(Configuration conf) throws Exception { /* Initialize fencing related paths, acls, and ops */ fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); - createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, - CreateMode.PERSISTENT); - deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); if (HAUtil.isHAEnabled(conf)) { String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); @@ -283,42 +269,31 @@ public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDirRecursively(znodeWorkingPath); - createRootDir(zkRootNodePath); + create(zkRootNodePath); if (HAUtil.isHAEnabled(getConfig())){ fence(); verifyActiveStatusThread = new VerifyActiveStatusThread(); verifyActiveStatusThread.start(); } - createRootDir(rmAppRoot); - createRootDir(rmDTSecretManagerRoot); - createRootDir(dtMasterKeysRootPath); - createRootDir(delegationTokensRootPath); - createRootDir(dtSequenceNumberPath); - createRootDir(amrmTokenSecretManagerRoot); + create(rmAppRoot); + create(rmDTSecretManagerRoot); + create(dtMasterKeysRootPath); + create(delegationTokensRootPath); + create(dtSequenceNumberPath); + create(amrmTokenSecretManagerRoot); } - protected void createRootDir(final String rootPath) throws Exception { - // For root dirs, we shouldn't use the doMulti helper methods - new ZKAction() { - @Override - public String run() throws KeeperException, InterruptedException { - try { - return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); - } catch (KeeperException ke) { - if (ke.code() == Code.NODEEXISTS) { - LOG.debug(rootPath + "znode already exists!"); - return null; - } else { - throw ke; - } - } - } - }.runWithRetries(); + protected void create(final String path) throws Exception { + if (!exists(path)) { + curatorFramework.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(path, null); + } } private void logRootNodeAcls(String prefix) throws Exception { Stat getStat = new Stat(); - List getAcls = getACLWithRetries(zkRootNodePath, getStat); + List getAcls = getACL(zkRootNodePath); StringBuilder builder = new StringBuilder(); builder.append(prefix); @@ -334,51 +309,21 @@ private synchronized void fence() throws Exception { logRootNodeAcls("Before fencing\n"); } - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1); - return null; - } - }.runWithRetries(); - - // delete fencingnodepath - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - try { - zkClient.multi(Collections.singletonList(deleteFencingNodePathOp)); - } catch (KeeperException.NoNodeException nne) { - LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete"); - } - return null; - } - }.runWithRetries(); + curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath); + delete(fencingNodePath); if (LOG.isTraceEnabled()) { logRootNodeAcls("After fencing\n"); } } - private synchronized void closeZkClients() throws IOException { - zkClient = null; - if (activeZkClient != null) { - try { - activeZkClient.close(); - } catch (InterruptedException e) { - throw new IOException("Interrupted while closing ZK", e); - } - activeZkClient = null; - } - } - @Override protected synchronized void closeInternal() throws Exception { if (verifyActiveStatusThread != null) { verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.join(1000); } - closeZkClients(); + curatorFramework.close(); } @Override @@ -391,10 +336,10 @@ protected synchronized void storeVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (existsWithRetries(versionNodePath, false) != null) { - setDataWithRetries(versionNodePath, data, -1); + if (exists(versionNodePath)) { + safeSetData(versionNodePath, data, -1); } else { - createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); + safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); } } @@ -402,11 +347,9 @@ protected synchronized void storeVersion() throws Exception { protected synchronized Version loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); - if (existsWithRetries(versionNodePath, false) != null) { - byte[] data = getDataWithRetries(versionNodePath, false); - Version version = - new VersionPBImpl(VersionProto.parseFrom(data)); - return version; + if (exists(versionNodePath)) { + byte[] data = getData(versionNodePath); + return new VersionPBImpl(VersionProto.parseFrom(data)); } return null; } @@ -415,20 +358,20 @@ protected synchronized Version loadVersion() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception { String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); long currentEpoch = 0; - if (existsWithRetries(epochNodePath, false) != null) { + if (exists(epochNodePath)) { // load current epoch - byte[] data = getDataWithRetries(epochNodePath, false); + byte[] data = getData(epochNodePath); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - setDataWithRetries(epochNodePath, storeData, -1); + safeSetData(epochNodePath, storeData, -1); } else { // initialize epoch node with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); + safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); } return currentEpoch; } @@ -447,7 +390,7 @@ public synchronized RMState loadState() throws Exception { private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { - byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false); + byte[] data = getData(amrmTokenSecretManagerRoot); if (data == null) { LOG.warn("There is no data saved"); return; @@ -458,7 +401,6 @@ private void loadAMRMTokenSecretManagerState(RMState rmState) rmState.amrmTokenSecretManagerState = AMRMTokenSecretManagerState.newInstance( stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); - } private synchronized void loadRMDTSecretManagerState(RMState rmState) @@ -470,10 +412,10 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) private void loadRMDelegationKeyState(RMState rmState) throws Exception { List childNodes = - getChildrenWithRetries(dtMasterKeysRootPath, false); + getChildren(dtMasterKeysRootPath); for (String childNodeName : childNodes) { String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, false); + byte[] childData = getData(childNodePath); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -500,7 +442,7 @@ private void loadRMDelegationKeyState(RMState rmState) throws Exception { } private void loadRMSequentialNumberState(RMState rmState) throws Exception { - byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false); + byte[] seqData = getData(dtSequenceNumberPath); if (seqData != null) { ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); DataInputStream seqIn = new DataInputStream(seqIs); @@ -515,11 +457,11 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception { private void loadRMDelegationTokenState(RMState rmState) throws Exception { List childNodes = - getChildrenWithRetries(delegationTokensRootPath, false); + getChildren(delegationTokensRootPath); for (String childNodeName : childNodes) { String childNodePath = getNodePath(delegationTokensRootPath, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, false); + byte[] childData = getData(childNodePath); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -551,10 +493,10 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception { } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = getChildrenWithRetries(rmAppRoot, false); + List childNodes = getChildren(rmAppRoot); for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, false); + byte[] childData = getData(childNodePath); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application if (LOG.isDebugEnabled()) { @@ -581,11 +523,11 @@ private void loadApplicationAttemptState(ApplicationStateData appState, ApplicationId appId) throws Exception { String appPath = getNodePath(rmAppRoot, appId.toString()); - List attempts = getChildrenWithRetries(appPath, false); + List attempts = getChildren(appPath); for (String attemptIDStr : attempts) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { String attemptPath = getNodePath(appPath, attemptIDStr); - byte[] attemptData = getDataWithRetries(attemptPath, false); + byte[] attemptData = getData(attemptPath); ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( @@ -606,8 +548,8 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - createWithRetries(nodeCreatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); } @@ -622,11 +564,11 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - if (existsWithRetries(nodeUpdatePath, false) != null) { - setDataWithRetries(nodeUpdatePath, appStateData, -1); + if (exists(nodeUpdatePath)) { + safeSetData(nodeUpdatePath, appStateData, -1); } else { - createWithRetries(nodeUpdatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeUpdatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); LOG.debug(appId + " znode didn't exist. Created a new znode to" + " update the application state."); } @@ -646,8 +588,8 @@ public synchronized void storeApplicationAttemptStateInternal( + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - createWithRetries(nodeCreatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeCreatePath, attemptStateData, zkAcl, + CreateMode.PERSISTENT); } @Override @@ -665,11 +607,11 @@ public synchronized void updateApplicationAttemptStateInternal( } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - if (existsWithRetries(nodeUpdatePath, false) != null) { - setDataWithRetries(nodeUpdatePath, attemptStateData, -1); + if (exists(nodeUpdatePath)) { + safeSetData(nodeUpdatePath, attemptStateData, -1); } else { - createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeUpdatePath, attemptStateData, zkAcl, + CreateMode.PERSISTENT); LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to" + " update the application attempt state."); } @@ -682,28 +624,23 @@ public synchronized void removeApplicationStateInternal( String appId = appState.getApplicationSubmissionContext().getApplicationId() .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); - ArrayList opList = new ArrayList(); - - for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { - String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); - opList.add(Op.delete(attemptRemovePath, -1)); - } - opList.add(Op.delete(appIdRemovePath, -1)); if (LOG.isDebugEnabled()) { LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath + " and its attempts."); } - doDeleteMultiWithRetries(opList); + + // TODO: Check deleting appIdRemovePath works recursively + safeDelete(appIdRemovePath); } @Override protected synchronized void storeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - ArrayList opList = new ArrayList(); - addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false); - doStoreMultiWithRetries(opList); + SafeTransaction trx = new SafeTransaction(); + addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); + trx.commit(); } @Override @@ -716,35 +653,29 @@ protected synchronized void removeRMDelegationTokenState( LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - if (existsWithRetries(nodeRemovePath, false) != null) { - ArrayList opList = new ArrayList(); - opList.add(Op.delete(nodeRemovePath, -1)); - doDeleteMultiWithRetries(opList); - } else { - LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); - } + safeDelete(nodeRemovePath); } @Override protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - ArrayList opList = new ArrayList(); + SafeTransaction trx = new SafeTransaction(); String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - if (existsWithRetries(nodeRemovePath, false) == null) { + if (exists(nodeRemovePath)) { + // in case znode exists + addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true); + } else { // in case znode doesn't exist - addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false); + addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); - } else { - // in case znode exists - addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true); } - doStoreMultiWithRetries(opList); + trx.commit(); } - private void addStoreOrUpdateOps(ArrayList opList, + private void addStoreOrUpdateOps(SafeTransaction trx, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, boolean isUpdate) throws Exception { // store RM delegation token @@ -762,19 +693,19 @@ private void addStoreOrUpdateOps(ArrayList opList, } if (isUpdate) { - opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1)); + trx.setData(nodeCreatePath, identifierData.toByteArray(), -1); } else { - opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, - CreateMode.PERSISTENT)); + trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, + CreateMode.PERSISTENT); // Update Sequence number only while storing DT seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); - if (LOG.isDebugEnabled()) { - LOG.debug((isUpdate ? "Storing " : "Updating ") + - dtSequenceNumberPath + ". SequenceNumber: " - + rmDTIdentifier.getSequenceNumber()); - } - opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); } + if (LOG.isDebugEnabled()) { + LOG.debug((isUpdate ? "Storing " : "Updating ") + + dtSequenceNumberPath + ". SequenceNumber: " + + rmDTIdentifier.getSequenceNumber()); + } + trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1); } finally { seqOs.close(); } @@ -793,7 +724,7 @@ protected synchronized void storeRMDTMasterKeyState( } delegationKey.write(fsOut); try { - createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl, + safeCreate(nodeCreatePath, os.toByteArray(), zkAcl, CreateMode.PERSISTENT); } finally { os.close(); @@ -809,243 +740,114 @@ protected synchronized void removeRMDTMasterKeyState( if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - if (existsWithRetries(nodeRemovePath, false) != null) { - doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1)); - } else { - LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); - } + safeDelete(nodeRemovePath); } @Override public synchronized void deleteStore() throws Exception { - if (existsWithRetries(zkRootNodePath, false) != null) { - deleteWithRetries(zkRootNodePath, false); - } + delete(zkRootNodePath); } @Override public synchronized void removeApplication(ApplicationId removeAppId) throws Exception { String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString()); - if (existsWithRetries(appIdRemovePath, false) != null) { - deleteWithRetries(appIdRemovePath, false); - } + delete(appIdRemovePath); } - // ZK related code - /** - * Watcher implementation which forward events to the ZKRMStateStore This - * hides the ZK methods of the store from its public interface - */ - private final class ForwardingWatcher implements Watcher { - private ZooKeeper watchedZkClient; - - public ForwardingWatcher(ZooKeeper client) { - this.watchedZkClient = client; - } - - @Override - public void process(WatchedEvent event) { - try { - ZKRMStateStore.this.processWatchEvent(watchedZkClient, event); - } catch (Throwable t) { - LOG.error("Failed to process watcher event " + event + ": " - + StringUtils.stringifyException(t)); - } - } - } - - @VisibleForTesting - @Private - @Unstable - public synchronized void processWatchEvent(ZooKeeper zk, - WatchedEvent event) throws Exception { - // only process watcher event from current ZooKeeper Client session. - if (zk != activeZkClient) { - LOG.info("Ignore watcher event type: " + event.getType() + - " with state:" + event.getState() + " for path:" + - event.getPath() + " from old session"); - return; - } - - Event.EventType eventType = event.getType(); - LOG.info("Watcher event type: " + eventType + " with state:" - + event.getState() + " for path:" + event.getPath() + " for " + this); - - if (eventType == Event.EventType.None) { - - // the connection state has changed - switch (event.getState()) { - case SyncConnected: - LOG.info("ZKRMStateStore Session connected"); - if (zkClient == null) { - // the SyncConnected must be from the client that sent Disconnected - zkClient = activeZkClient; - ZKRMStateStore.this.notifyAll(); - LOG.info("ZKRMStateStore Session restored"); - } - break; - case Disconnected: - LOG.info("ZKRMStateStore Session disconnected"); - zkClient = null; - break; - case Expired: - // the connection got terminated because of session timeout - // call listener to reconnect - LOG.info("ZKRMStateStore Session expired"); - createConnection(); - break; - default: - LOG.error("Unexpected Zookeeper" + - " watch event state: " + event.getState()); - break; - } - } - } @VisibleForTesting - @Private - @Unstable String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } /** - * Helper method that creates fencing node, executes the passed operations, - * and deletes the fencing node. + * Use curator transactions to ensure zk-operations are performed in an all + * or nothing fashion. This is equivalent to using ZooKeeper#multi. + * + * TODO: Curator 3.0 introduces CuratorOp similar to Op. We ll have to + * rewrite this inner class when we adopt that. */ - private synchronized void doStoreMultiWithRetries( - final List opList) throws Exception { - final List execOpList = new ArrayList(opList.size() + 2); - execOpList.add(createFencingNodePathOp); - execOpList.addAll(opList); - execOpList.add(deleteFencingNodePathOp); - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.multi(execOpList); - return null; - } - }.runWithRetries(); - } + private class SafeTransaction { + private CuratorTransactionFinal transactionFinal; + private boolean create = false, delete = false; - /** - * Helper method that creates fencing node, executes the passed operation, - * and deletes the fencing node. - */ - private void doStoreMultiWithRetries(final Op op) throws Exception { - doStoreMultiWithRetries(Collections.singletonList(op)); - } + SafeTransaction() throws Exception { + CuratorTransaction transaction = curatorFramework.inTransaction(); + transactionFinal = + transaction.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(fencingNodePath, new byte[0]).and(); + } - /** - * Helper method that creates fencing node, executes the passed - * delete related operations and deletes the fencing node. - */ - private synchronized void doDeleteMultiWithRetries( - final List opList) throws Exception { - final List execOpList = new ArrayList(opList.size() + 2); - execOpList.add(createFencingNodePathOp); - execOpList.addAll(opList); - execOpList.add(deleteFencingNodePathOp); - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - setHasDeleteNodeOp(true); - zkClient.multi(execOpList); - return null; - } - }.runWithRetries(); - } + public void commit() throws Exception { + transactionFinal = transactionFinal.delete() + .forPath(fencingNodePath).and(); + transactionFinal.commit(); + } + + public void create(String path, byte[] data, List acl, CreateMode mode) + throws Exception { + transactionFinal = transactionFinal.create() + .withMode(mode).withACL(acl).forPath(path, data).and(); + create = true; + } + + public void delete(String path) throws Exception { + transactionFinal = transactionFinal.delete().forPath(path).and(); + delete = true; + } - private void doDeleteMultiWithRetries(final Op op) throws Exception { - doDeleteMultiWithRetries(Collections.singletonList(op)); + public void setData(String path, byte[] data, int version) + throws Exception { + transactionFinal = transactionFinal.setData() + .withVersion(version).forPath(path, data).and(); + } } - @VisibleForTesting - @Private - @Unstable - public void createWithRetries( - final String path, final byte[] data, final List acl, - final CreateMode mode) throws Exception { - doStoreMultiWithRetries(Op.create(path, data, acl, mode)); + private void safeCreate(String path, byte[] data, List acl, + CreateMode mode) throws Exception { + if (!exists(path)) { + SafeTransaction transaction = new SafeTransaction(); + transaction.create(path, data, acl, mode); + transaction.commit(); + } } - @VisibleForTesting - @Private - @Unstable - public void setDataWithRetries(final String path, final byte[] data, - final int version) throws Exception { - doStoreMultiWithRetries(Op.setData(path, data, version)); + private void safeDelete(final String path) throws Exception { + if (exists(path)) { + SafeTransaction transaction = new SafeTransaction(); + transaction.delete(path); + transaction.commit(); + } } - @VisibleForTesting - @Private - @Unstable - public byte[] getDataWithRetries(final String path, final boolean watch) + private void safeSetData(String path, byte[] data, int version) throws Exception { - return new ZKAction() { - @Override - public byte[] run() throws KeeperException, InterruptedException { - return zkClient.getData(path, watch, null); - } - }.runWithRetries(); + SafeTransaction transaction = new SafeTransaction(); + transaction.setData(path, data, version); + transaction.commit(); } - private List getACLWithRetries( - final String path, final Stat stat) throws Exception { - return new ZKAction>() { - @Override - public List run() throws KeeperException, InterruptedException { - return zkClient.getACL(path, stat); - } - }.runWithRetries(); + private byte[] getData(final String path) throws Exception { + return curatorFramework.getData().forPath(path); } - private List getChildrenWithRetries( - final String path, final boolean watch) throws Exception { - return new ZKAction>() { - @Override - List run() throws KeeperException, InterruptedException { - return zkClient.getChildren(path, watch); - } - }.runWithRetries(); + private List getACL(final String path) throws Exception { + return curatorFramework.getACL().forPath(path); } - private Stat existsWithRetries( - final String path, final boolean watch) throws Exception { - return new ZKAction() { - @Override - Stat run() throws KeeperException, InterruptedException { - return zkClient.exists(path, watch); - } - }.runWithRetries(); + private List getChildren(final String path) throws Exception { + return curatorFramework.getChildren().forPath(path); } - private void deleteWithRetries( - final String path, final boolean watch) throws Exception { - new ZKAction() { - @Override - Void run() throws KeeperException, InterruptedException { - recursiveDeleteWithRetriesHelper(path, watch); - return null; - } - }.runWithRetries(); + private boolean exists(final String path) throws Exception { + return curatorFramework.checkExists().forPath(path) != null; } - /** - * Helper method that deletes znodes recursively - */ - private void recursiveDeleteWithRetriesHelper(String path, boolean watch) - throws KeeperException, InterruptedException { - List children = zkClient.getChildren(path, watch); - for (String child : children) { - recursiveDeleteWithRetriesHelper(path + "/" + child, false); - } - - try { - zkClient.delete(path, -1); - } catch (KeeperException.NoNodeException nne) { - LOG.info("Node " + path + " doesn't exist to delete"); + private void delete(final String path) throws Exception { + if (exists(path)) { + curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); } } @@ -1054,8 +856,6 @@ private void recursiveDeleteWithRetriesHelper(String path, boolean watch) * this RM continues to be the Active. */ private class VerifyActiveStatusThread extends Thread { - private List emptyOpList = new ArrayList(); - VerifyActiveStatusThread() { super(VerifyActiveStatusThread.class.getName()); } @@ -1066,7 +866,8 @@ public void run() { if(isFencedState()) { break; } - doStoreMultiWithRetries(emptyOpList); + // Create and delete fencing node + new SafeTransaction().commit(); Thread.sleep(zkSessionTimeout); } } catch (InterruptedException ie) { @@ -1078,118 +879,37 @@ public void run() { } } - private abstract class ZKAction { - private boolean hasDeleteNodeOp = false; - void setHasDeleteNodeOp(boolean hasDeleteOp) { - this.hasDeleteNodeOp = hasDeleteOp; - } - // run() expects synchronization on ZKRMStateStore.this - abstract T run() throws KeeperException, InterruptedException; - - T runWithCheck() throws Exception { - long startTime = System.currentTimeMillis(); - synchronized (ZKRMStateStore.this) { - while (zkClient == null) { - ZKRMStateStore.this.wait(zkSessionTimeout); - if (zkClient != null) { - break; - } - if (System.currentTimeMillis() - startTime > zkSessionTimeout) { - throw new IOException("Wait for ZKClient creation timed out"); - } - } - return run(); - } - } + private void createConnection() throws Exception { + // Curator connection + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + builder = builder.connectString(zkHostPort) + .connectionTimeoutMs(zkSessionTimeout) + .retryPolicy( + new RetryNTimes(numRetries, zkSessionTimeout / numRetries)); - private boolean shouldRetry(Code code) { - switch (code) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - case SESSIONEXPIRED: - case SESSIONMOVED: - return true; - default: - break; - } - return false; + // Set up authorization based on fencing scheme + List authInfos = new ArrayList<>(); + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); } - - T runWithRetries() throws Exception { - int retry = 0; - while (true) { - try { - return runWithCheck(); - } catch (KeeperException.NoAuthException nae) { - if (HAUtil.isHAEnabled(getConfig())) { - // NoAuthException possibly means that this store is fenced due to - // another RM becoming active. Even if not, - // it is safer to assume we have been fenced - throw new StoreFencedException(); - } - } catch (KeeperException ke) { - if (ke.code() == Code.NODEEXISTS) { - LOG.info("znode already exists!"); - return null; - } - if (hasDeleteNodeOp && ke.code() == Code.NONODE) { - LOG.info("znode has already been deleted!"); - return null; - } - - LOG.info("Exception while executing a ZK operation.", ke); - if (shouldRetry(ke.code()) && ++retry < numRetries) { - LOG.info("Retrying operation on ZK. Retry no. " + retry); - Thread.sleep(zkRetryInterval); - createConnection(); - continue; - } - LOG.info("Maxed out ZK retries. Giving up!"); - throw ke; - } - } + if (useDefaultFencingScheme) { + byte[] defaultFencingAuth = + (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes( + Charset.forName("UTF-8")); + authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth)); } - } + builder = builder.authorization(authInfos); - private synchronized void createConnection() - throws IOException, InterruptedException { - closeZkClients(); - for (int retries = 0; retries < numRetries && zkClient == null; - retries++) { - try { - activeZkClient = getNewZooKeeper(); - zkClient = activeZkClient; - for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { - zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); - } - if (useDefaultFencingScheme) { - zkClient.addAuthInfo(zkRootNodeAuthScheme, - (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8"))); - } - } catch (IOException ioe) { - // Retry in case of network failures - LOG.info("Failed to connect to the ZooKeeper on attempt - " + - (retries + 1)); - ioe.printStackTrace(); - } - } - if (zkClient == null) { - LOG.error("Unable to connect to Zookeeper"); - throw new YarnRuntimeException("Unable to connect to Zookeeper"); + // Connect to ZK + curatorFramework = builder.build(); + curatorFramework.start(); + if (!curatorFramework.blockUntilConnected( + zkSessionTimeout, TimeUnit.MILLISECONDS)) { + LOG.fatal("Couldn't establish connection to ZK server"); + throw new YarnRuntimeException("Couldn't connect to ZK server"); } - ZKRMStateStore.this.notifyAll(); - LOG.info("Created new ZK connection"); - } - // protected to mock for testing - @VisibleForTesting - @Private - @Unstable - protected synchronized ZooKeeper getNewZooKeeper() - throws IOException, InterruptedException { - ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); - zk.register(new ForwardingWatcher(zk)); - return zk; + LOG.info("Connected to ZooKeeper"); } @Override @@ -1199,7 +919,7 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); - setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1); + safeSetData(amrmTokenSecretManagerRoot, stateData, -1); } /** @@ -1213,7 +933,7 @@ private void createRootDirRecursively(String path) throws Exception { StringBuilder sb = new StringBuilder(); for (int i = 1; i < pathParts.length; i++) { sb.append("/").append(pathParts[i]); - createRootDir(sb.toString()); + create(sb.toString()); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 4d0e560..9e0d22b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -76,7 +75,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; -public class RMStateStoreTestBase extends ClientBaseWithFixes{ +public class RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 333455c..974007c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -18,18 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; - -import javax.crypto.SecretKey; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -50,6 +42,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -61,22 +54,51 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import javax.crypto.SecretKey; + + + public class TestZKRMStateStore extends RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); private static final int ZK_TIMEOUT_MS = 1000; + private TestingServer curatorTestingServer; + private CuratorFramework curatorFramework; + + @Before + public void setupCuratorServer() throws Exception { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(curatorTestingServer.getConnectString()) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + } + + @After + public void cleanupCuratorServer() throws IOException { + curatorFramework.close(); + curatorTestingServer.stop(); + } class TestZKRMStateStoreTester implements RMStateStoreHelper { - ZooKeeper client; TestZKRMStateStoreInternal store; String workingZnode; + class TestZKRMStateStoreInternal extends ZKRMStateStore { public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) @@ -86,10 +108,10 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) assertTrue(znodeWorkingPath.equals(workingZnode)); } - @Override - public ZooKeeper getNewZooKeeper() throws IOException { - return client; - } +// @Override +// public ZooKeeper getNewZooKeeper() throws IOException { +// return client; +// } public String getVersionNode() { return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; @@ -109,7 +131,7 @@ public String getAppNode(String appId) { * @throws Exception */ public void testRetryingCreateRootDir() throws Exception { - createRootDir(znodeWorkingPath); + create(znodeWorkingPath); } } @@ -117,23 +139,24 @@ public void testRetryingCreateRootDir() throws Exception { public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); workingZnode = "/jira/issue/3077/rmstore"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.client = createClient(); this.store = new TestZKRMStateStoreInternal(conf, workingZnode); return this.store; } @Override public boolean isFinalStateValid() throws Exception { - List nodes = client.getChildren(store.znodeWorkingPath, false); - return nodes.size() == 1; + return 1 == + curatorFramework.getChildren().forPath(store.znodeWorkingPath).size(); } @Override public void writeVersion(Version version) throws Exception { - client.setData(store.getVersionNode(), ((VersionPBImpl) version) - .getProto().toByteArray(), -1); + curatorFramework.setData().withVersion(-1) + .forPath(store.getVersionNode(), + ((VersionPBImpl) version).getProto().toByteArray()); } @Override @@ -142,10 +165,8 @@ public Version getCurrentVersion() throws Exception { } public boolean appExists(RMApp app) throws Exception { - Stat node = - client.exists(store.getAppNode(app.getApplicationId().toString()), - false); - return node !=null; + return null != curatorFramework.checkExists() + .forPath(store.getAppNode(app.getApplicationId().toString())); } } @@ -178,9 +199,9 @@ public Version getCurrentVersion() throws Exception { public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); workingZnode = "/jira/issue/3077/rmstore"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.client = createClient(); this.store = new TestZKRMStateStoreInternal(conf, workingZnode) { Version storedVersion = null; @@ -217,7 +238,8 @@ private Configuration createHARMConf( conf.set(YarnConfiguration.RM_HA_IDS, rmIds); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); conf.set(YarnConfiguration.RM_HA_ID, rmId); conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java index 654b357..e270404 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; import javax.crypto.SecretKey; + +import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -73,10 +75,11 @@ private ZKRMStateStore store; private AMRMTokenSecretManager appTokenMgr; private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr; + private TestingServer curatorTestingServer; @Before public void setUpZKServer() throws Exception { - super.setUp(); + curatorTestingServer = new TestingServer(); } @After @@ -87,7 +90,7 @@ public void tearDown() throws Exception { if (appTokenMgr != null) { appTokenMgr.stop(); } - super.tearDown(); + curatorTestingServer.stop(); } private void initStore(String hostPort) { @@ -95,7 +98,8 @@ private void initStore(String hostPort) { RMContext rmContext = mock(RMContext.class); conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort)); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + optHostPort.or(curatorTestingServer.getConnectString())); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); store = new ZKRMStateStore(); @@ -140,7 +144,7 @@ public int run(String[] args) { if (launchLocalZK) { try { - setUp(); + setUpZKServer(); } catch (Exception e) { System.err.println("failed to setup. : " + e.getMessage()); return -1; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java deleted file mode 100644 index 62dc5ef..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ClientBaseWithFixes; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher; -import org.apache.hadoop.util.ZKUtil; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestZKRMStateStoreZKClientConnections extends - ClientBaseWithFixes { - - private static final int ZK_OP_WAIT_TIME = 3000; - private static final int ZK_TIMEOUT_MS = 1000; - private Log LOG = - LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class); - - private static final String DIGEST_USER_PASS="test-user:test-password"; - private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS; - private static final String DIGEST_USER_HASH; - static { - try { - DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest( - DIGEST_USER_PASS); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } - private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda"; - - - class TestZKClient { - - ZKRMStateStore store; - boolean forExpire = false; - TestForwardingWatcher oldWatcher; - TestForwardingWatcher watcher; - CyclicBarrier syncBarrier = new CyclicBarrier(2); - - protected class TestZKRMStateStore extends ZKRMStateStore { - - public TestZKRMStateStore(Configuration conf, String workingZnode) - throws Exception { - init(conf); - start(); - assertTrue(znodeWorkingPath.equals(workingZnode)); - } - - @Override - public ZooKeeper getNewZooKeeper() - throws IOException, InterruptedException { - oldWatcher = watcher; - watcher = new TestForwardingWatcher(); - return createClient(watcher, hostPort, ZK_TIMEOUT_MS); - } - - @Override - public synchronized void processWatchEvent(ZooKeeper zk, - WatchedEvent event) throws Exception { - - if (forExpire) { - // a hack... couldn't find a way to trigger expired event. - WatchedEvent expriredEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.Expired, null); - super.processWatchEvent(zk, expriredEvent); - forExpire = false; - syncBarrier.await(); - } else { - super.processWatchEvent(zk, event); - } - } - } - - private class TestForwardingWatcher extends - ClientBaseWithFixes.CountdownWatcher { - public void process(WatchedEvent event) { - super.process(event); - try { - if (store != null) { - store.processWatchEvent(client, event); - } - } catch (Throwable t) { - LOG.error("Failed to process watcher event " + event + ": " - + StringUtils.stringifyException(t)); - } - } - } - - public RMStateStore getRMStateStore(Configuration conf) throws Exception { - String workingZnode = "/Test"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); - conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.store = new TestZKRMStateStore(conf, workingZnode); - return this.store; - } - } - - @Test (timeout = 20000) - public void testZKClientRetry() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - final String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - conf.setLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, 100); - final ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); - - stopServer(); - Thread clientThread = new Thread() { - @Override - public void run() { - try { - store.getDataWithRetries(path, true); - } catch (Exception e) { - e.printStackTrace(); - assertionFailedInThread.set(true); - } - } - }; - Thread.sleep(2000); - startServer(); - clientThread.join(); - Assert.assertFalse(assertionFailedInThread.get()); - } - - @Test(timeout = 20000) - public void testZKClientDisconnectAndReconnect() - throws Exception { - - TestZKClient zkClientTester = new TestZKClient(); - String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - - // trigger watch - store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - store.getDataWithRetries(path, true); - store.setDataWithRetries(path, "newBytes".getBytes(), 0); - - stopServer(); - zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME); - try { - store.getDataWithRetries(path, true); - fail("Expected ZKClient time out exception"); - } catch (Exception e) { - assertTrue(e.getMessage().contains( - "Wait for ZKClient creation timed out")); - } - - // ZKRMStateStore Session restored - startServer(); - zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME); - byte[] ret = null; - try { - ret = store.getDataWithRetries(path, true); - } catch (Exception e) { - String error = "ZKRMStateStore Session restore failed"; - LOG.error(error, e); - fail(error); - } - assertEquals("newBytes", new String(ret)); - } - - @Test(timeout = 20000) - public void testZKSessionTimeout() throws Exception { - - TestZKClient zkClientTester = new TestZKClient(); - String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - - // a hack to trigger expired event - zkClientTester.forExpire = true; - - // trigger watch - store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - store.getDataWithRetries(path, true); - store.setDataWithRetries(path, "bytes".getBytes(), 0); - - zkClientTester.syncBarrier.await(); - // after this point, expired event has already been processed. - - try { - byte[] ret = store.getDataWithRetries(path, false); - assertEquals("bytes", new String(ret)); - } catch (Exception e) { - String error = "New session creation failed"; - LOG.error(error, e); - fail(error); - } - - // send Disconnected event from old client session to ZKRMStateStore - // check the current client session is not affected. - Assert.assertTrue(zkClientTester.oldWatcher != null); - WatchedEvent disconnectedEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.Disconnected, null); - zkClientTester.oldWatcher.process(disconnectedEvent); - Assert.assertTrue(store.zkClient != null); - - zkClientTester.watcher.process(disconnectedEvent); - Assert.assertTrue(store.zkClient == null); - WatchedEvent connectedEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.SyncConnected, null); - zkClientTester.watcher.process(connectedEvent); - Assert.assertTrue(store.zkClient != null); - Assert.assertTrue(store.zkClient == store.activeZkClient); - } - - @Test(timeout = 20000) - public void testSetZKAcl() { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca"); - try { - zkClientTester.store.zkClient.delete(zkClientTester.store - .znodeWorkingPath, -1); - fail("Shouldn't be able to delete path"); - } catch (Exception e) {/* expected behavior */ - } - } - - @Test(timeout = 20000) - public void testInvalidZKAclConfiguration() { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*"); - try { - zkClientTester.getRMStateStore(conf); - fail("ZKRMStateStore created with bad ACL"); - } catch (ZKUtil.BadAclFormatException bafe) { - // expected behavior - } catch (Exception e) { - String error = "Incorrect exception on BadAclFormat"; - LOG.error(error, e); - fail(error); - } - } - - @Test - public void testZKAuths() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL); - conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD); - - zkClientTester.getRMStateStore(conf); - } - - @Test - public void testZKRetryInterval() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS, - store.zkRetryInterval); - store.stop(); - - conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS / - YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES, - store.zkRetryInterval); - store.stop(); - } -}