diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 76d280a..4960f95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -175,6 +175,14 @@ hadoop-yarn-server-web-proxy + org.apache.curator + curator-client + + + org.apache.curator + curator-test + + org.apache.zookeeper zookeeper diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 15ac971..bf82972 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -29,15 +29,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -64,14 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.Op; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; @@ -152,20 +150,14 @@ protected String znodeWorkingPath; @VisibleForTesting - protected ZooKeeper zkClient; - - /* activeZkClient is not used to do actual operations, - * it is only used to verify client session for watched events and - * it gets activated into zkClient on connection event. - */ - @VisibleForTesting - ZooKeeper activeZkClient; + protected CuratorFramework curatorFramework; /** Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private String fencingNodePath; private Op createFencingNodePathOp; private Op deleteFencingNodePathOp; + private Thread verifyActiveStatusThread; private String zkRootNodeUsername; private final String zkRootNodePassword = Long.toString(random.nextLong()); @@ -192,7 +184,7 @@ @Unstable protected List constructZkRootNodeACL( Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { - List zkRootNodeAcl = new ArrayList(); + List zkRootNodeAcl = new ArrayList<>(); for (ACL acl : sourceACLs) { zkRootNodeAcl.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), @@ -298,22 +290,8 @@ public synchronized void startInternal() throws Exception { } protected void createRootDir(final String rootPath) throws Exception { - // For root dirs, we shouldn't use the doMulti helper methods - new ZKAction() { - @Override - public String run() throws KeeperException, InterruptedException { - try { - return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); - } catch (KeeperException ke) { - if (ke.code() == Code.NODEEXISTS) { - LOG.debug(rootPath + "znode already exists!"); - return null; - } else { - throw ke; - } - } - } - }.runWithRetries(); + curatorFramework.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl).forPath(rootPath); } private void logRootNodeAcls(String prefix) throws Exception { @@ -334,43 +312,25 @@ private synchronized void fence() throws Exception { logRootNodeAcls("Before fencing\n"); } - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1); - return null; - } - }.runWithRetries(); - - // delete fencingnodepath - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - try { - zkClient.multi(Collections.singletonList(deleteFencingNodePathOp)); - } catch (KeeperException.NoNodeException nne) { - LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete"); - } - return null; - } - }.runWithRetries(); + curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath); + curatorFramework.delete().forPath(fencingNodePath); if (LOG.isTraceEnabled()) { logRootNodeAcls("After fencing\n"); } } - private synchronized void closeZkClients() throws IOException { - zkClient = null; - if (activeZkClient != null) { - try { - activeZkClient.close(); - } catch (InterruptedException e) { - throw new IOException("Interrupted while closing ZK", e); - } - activeZkClient = null; - } - } +// private synchronized void closeZkClients() throws IOException { +// zkClient = null; +// if (activeZkClient != null) { +// try { +// activeZkClient.close(); +// } catch (InterruptedException e) { +// throw new IOException("Interrupted while closing ZK", e); +// } +// activeZkClient = null; +// } +// } @Override protected synchronized void closeInternal() throws Exception { @@ -378,7 +338,8 @@ protected synchronized void closeInternal() throws Exception { verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.join(1000); } - closeZkClients(); + curatorFramework.close(); +// closeZkClients(); } @Override @@ -404,9 +365,7 @@ protected synchronized Version loadVersion() throws Exception { if (existsWithRetries(versionNodePath, false) != null) { byte[] data = getDataWithRetries(versionNodePath, false); - Version version = - new VersionPBImpl(VersionProto.parseFrom(data)); - return version; + return new VersionPBImpl(VersionProto.parseFrom(data)); } return null; } @@ -607,7 +566,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, } byte[] appStateData = appStateDataPB.getProto().toByteArray(); createWithRetries(nodeCreatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); + CreateMode.PERSISTENT); } @@ -682,7 +641,7 @@ public synchronized void removeApplicationStateInternal( String appId = appState.getApplicationSubmissionContext().getApplicationId() .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); - ArrayList opList = new ArrayList(); + ArrayList opList = new ArrayList<>(); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); @@ -701,7 +660,7 @@ public synchronized void removeApplicationStateInternal( protected synchronized void storeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - ArrayList opList = new ArrayList(); + ArrayList opList = new ArrayList<>(); addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false); doStoreMultiWithRetries(opList); } @@ -717,7 +676,7 @@ protected synchronized void removeRMDelegationTokenState( + rmDTIdentifier.getSequenceNumber()); } if (existsWithRetries(nodeRemovePath, false) != null) { - ArrayList opList = new ArrayList(); + ArrayList opList = new ArrayList<>(); opList.add(Op.delete(nodeRemovePath, -1)); doDeleteMultiWithRetries(opList); } else { @@ -729,7 +688,7 @@ protected synchronized void removeRMDelegationTokenState( protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - ArrayList opList = new ArrayList(); + ArrayList opList = new ArrayList<>(); String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); @@ -764,17 +723,18 @@ private void addStoreOrUpdateOps(ArrayList opList, if (isUpdate) { opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1)); } else { - opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, - CreateMode.PERSISTENT)); + opList + .add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, + CreateMode.PERSISTENT)); // Update Sequence number only while storing DT seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); - if (LOG.isDebugEnabled()) { - LOG.debug((isUpdate ? "Storing " : "Updating ") + - dtSequenceNumberPath + ". SequenceNumber: " - + rmDTIdentifier.getSequenceNumber()); - } - opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); } + if (LOG.isDebugEnabled()) { + LOG.debug((isUpdate ? "Storing " : "Updating ") + + dtSequenceNumberPath + ". SequenceNumber: " + + rmDTIdentifier.getSequenceNumber()); + } + opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); } finally { seqOs.close(); } @@ -832,76 +792,6 @@ public synchronized void removeApplication(ApplicationId removeAppId) } } - // ZK related code - /** - * Watcher implementation which forward events to the ZKRMStateStore This - * hides the ZK methods of the store from its public interface - */ - private final class ForwardingWatcher implements Watcher { - private ZooKeeper watchedZkClient; - - public ForwardingWatcher(ZooKeeper client) { - this.watchedZkClient = client; - } - - @Override - public void process(WatchedEvent event) { - try { - ZKRMStateStore.this.processWatchEvent(watchedZkClient, event); - } catch (Throwable t) { - LOG.error("Failed to process watcher event " + event + ": " - + StringUtils.stringifyException(t)); - } - } - } - - @VisibleForTesting - @Private - @Unstable - public synchronized void processWatchEvent(ZooKeeper zk, - WatchedEvent event) throws Exception { - // only process watcher event from current ZooKeeper Client session. - if (zk != activeZkClient) { - LOG.info("Ignore watcher event type: " + event.getType() + - " with state:" + event.getState() + " for path:" + - event.getPath() + " from old session"); - return; - } - - Event.EventType eventType = event.getType(); - LOG.info("Watcher event type: " + eventType + " with state:" - + event.getState() + " for path:" + event.getPath() + " for " + this); - - if (eventType == Event.EventType.None) { - - // the connection state has changed - switch (event.getState()) { - case SyncConnected: - LOG.info("ZKRMStateStore Session connected"); - if (zkClient == null) { - // the SyncConnected must be from the client that sent Disconnected - zkClient = activeZkClient; - ZKRMStateStore.this.notifyAll(); - LOG.info("ZKRMStateStore Session restored"); - } - break; - case Disconnected: - LOG.info("ZKRMStateStore Session disconnected"); - zkClient = null; - break; - case Expired: - // the connection got terminated because of session timeout - // call listener to reconnect - LOG.info("ZKRMStateStore Session expired"); - createConnection(); - break; - default: - LOG.error("Unexpected Zookeeper" + - " watch event state: " + event.getState()); - break; - } - } - } @VisibleForTesting @Private @@ -916,17 +806,11 @@ String getNodePath(String root, String nodeName) { */ private synchronized void doStoreMultiWithRetries( final List opList) throws Exception { - final List execOpList = new ArrayList(opList.size() + 2); + final List execOpList = new ArrayList<>(opList.size() + 2); execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); execOpList.add(deleteFencingNodePathOp); - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.multi(execOpList); - return null; - } - }.runWithRetries(); + curatorFramework.getZookeeperClient().getZooKeeper().multi(execOpList); } /** @@ -943,18 +827,11 @@ private void doStoreMultiWithRetries(final Op op) throws Exception { */ private synchronized void doDeleteMultiWithRetries( final List opList) throws Exception { - final List execOpList = new ArrayList(opList.size() + 2); + final List execOpList = new ArrayList<>(opList.size() + 2); execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); execOpList.add(deleteFencingNodePathOp); - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - setHasDeleteNodeOp(true); - zkClient.multi(execOpList); - return null; - } - }.runWithRetries(); + curatorFramework.getZookeeperClient().getZooKeeper().multi(execOpList); } private void doDeleteMultiWithRetries(final Op op) throws Exception { @@ -983,70 +860,27 @@ public void setDataWithRetries(final String path, final byte[] data, @Unstable public byte[] getDataWithRetries(final String path, final boolean watch) throws Exception { - return new ZKAction() { - @Override - public byte[] run() throws KeeperException, InterruptedException { - return zkClient.getData(path, watch, null); - } - }.runWithRetries(); + return curatorFramework.getData().forPath(path); } private List getACLWithRetries( final String path, final Stat stat) throws Exception { - return new ZKAction>() { - @Override - public List run() throws KeeperException, InterruptedException { - return zkClient.getACL(path, stat); - } - }.runWithRetries(); + return curatorFramework.getACL().forPath(path); } private List getChildrenWithRetries( final String path, final boolean watch) throws Exception { - return new ZKAction>() { - @Override - List run() throws KeeperException, InterruptedException { - return zkClient.getChildren(path, watch); - } - }.runWithRetries(); + return curatorFramework.getChildren().forPath(path); } private Stat existsWithRetries( final String path, final boolean watch) throws Exception { - return new ZKAction() { - @Override - Stat run() throws KeeperException, InterruptedException { - return zkClient.exists(path, watch); - } - }.runWithRetries(); + return curatorFramework.checkExists().forPath(path); } private void deleteWithRetries( final String path, final boolean watch) throws Exception { - new ZKAction() { - @Override - Void run() throws KeeperException, InterruptedException { - recursiveDeleteWithRetriesHelper(path, watch); - return null; - } - }.runWithRetries(); - } - - /** - * Helper method that deletes znodes recursively - */ - private void recursiveDeleteWithRetriesHelper(String path, boolean watch) - throws KeeperException, InterruptedException { - List children = zkClient.getChildren(path, watch); - for (String child : children) { - recursiveDeleteWithRetriesHelper(path + "/" + child, false); - } - - try { - zkClient.delete(path, -1); - } catch (KeeperException.NoNodeException nne) { - LOG.info("Node " + path + " doesn't exist to delete"); - } + curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); } /** @@ -1078,118 +912,38 @@ public void run() { } } - private abstract class ZKAction { - private boolean hasDeleteNodeOp = false; - void setHasDeleteNodeOp(boolean hasDeleteOp) { - this.hasDeleteNodeOp = hasDeleteOp; - } - // run() expects synchronization on ZKRMStateStore.this - abstract T run() throws KeeperException, InterruptedException; - - T runWithCheck() throws Exception { - long startTime = System.currentTimeMillis(); - synchronized (ZKRMStateStore.this) { - while (zkClient == null) { - ZKRMStateStore.this.wait(zkSessionTimeout); - if (zkClient != null) { - break; - } - if (System.currentTimeMillis() - startTime > zkSessionTimeout) { - throw new IOException("Wait for ZKClient creation timed out"); - } - } - return run(); - } - } - - private boolean shouldRetry(Code code) { - switch (code) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - case SESSIONEXPIRED: - case SESSIONMOVED: - return true; - default: - break; - } - return false; - } - - T runWithRetries() throws Exception { - int retry = 0; - while (true) { - try { - return runWithCheck(); - } catch (KeeperException.NoAuthException nae) { - if (HAUtil.isHAEnabled(getConfig())) { - // NoAuthException possibly means that this store is fenced due to - // another RM becoming active. Even if not, - // it is safer to assume we have been fenced - throw new StoreFencedException(); - } - } catch (KeeperException ke) { - if (ke.code() == Code.NODEEXISTS) { - LOG.info("znode already exists!"); - return null; - } - if (hasDeleteNodeOp && ke.code() == Code.NONODE) { - LOG.info("znode has already been deleted!"); - return null; - } - - LOG.info("Exception while executing a ZK operation.", ke); - if (shouldRetry(ke.code()) && ++retry < numRetries) { - LOG.info("Retrying operation on ZK. Retry no. " + retry); - Thread.sleep(zkRetryInterval); - createConnection(); - continue; - } - LOG.info("Maxed out ZK retries. Giving up!"); - throw ke; - } - } - } - } - private synchronized void createConnection() throws IOException, InterruptedException { - closeZkClients(); - for (int retries = 0; retries < numRetries && zkClient == null; - retries++) { - try { - activeZkClient = getNewZooKeeper(); - zkClient = activeZkClient; - for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { - zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); - } - if (useDefaultFencingScheme) { - zkClient.addAuthInfo(zkRootNodeAuthScheme, - (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8"))); - } - } catch (IOException ioe) { - // Retry in case of network failures - LOG.info("Failed to connect to the ZooKeeper on attempt - " + - (retries + 1)); - ioe.printStackTrace(); - } - } - if (zkClient == null) { - LOG.error("Unable to connect to Zookeeper"); - throw new YarnRuntimeException("Unable to connect to Zookeeper"); - } - ZKRMStateStore.this.notifyAll(); - LOG.info("Created new ZK connection"); - } - - // protected to mock for testing - @VisibleForTesting - @Private - @Unstable - protected synchronized ZooKeeper getNewZooKeeper() - throws IOException, InterruptedException { - ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); - zk.register(new ForwardingWatcher(zk)); - return zk; + // Curator connection + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + builder = builder.connectString(zkHostPort) + .connectionTimeoutMs(zkSessionTimeout) + .retryPolicy( + new RetryNTimes(numRetries, zkSessionTimeout / numRetries)); + + // Set up authorization based on fencing scheme + List authInfos = new ArrayList<>(); + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); + } + if (useDefaultFencingScheme) { + byte[] defaultFencingAuth = + (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes( + Charset.forName("UTF-8")); + authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth)); + } + builder = builder.authorization(authInfos); + + // Connect to ZK + curatorFramework = builder.build(); + curatorFramework.start(); + if (!curatorFramework.blockUntilConnected( + zkSessionTimeout, TimeUnit.MILLISECONDS)) { + LOG.fatal("Couldn't establish connection to ZK server"); + throw new YarnRuntimeException("Couldn't connect to ZK server"); + } + + LOG.info("Connected to ZooKeeper"); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 4d0e560..9e0d22b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -76,7 +75,7 @@ import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; -public class RMStateStoreTestBase extends ClientBaseWithFixes{ +public class RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 333455c..351e7f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -18,18 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; - import javax.crypto.SecretKey; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -61,22 +57,45 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + public class TestZKRMStateStore extends RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); private static final int ZK_TIMEOUT_MS = 1000; + private TestingServer curatorTestingServer; + private CuratorFramework curatorFramework; + + @Before + public void setupCuratorServer() { + try { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(curatorTestingServer.getConnectString()) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + + } catch (Exception e) { + LOG.fatal("Unable to start Curator TestingServer"); + throw new RuntimeException(e); + } + } class TestZKRMStateStoreTester implements RMStateStoreHelper { - ZooKeeper client; TestZKRMStateStoreInternal store; String workingZnode; + class TestZKRMStateStoreInternal extends ZKRMStateStore { public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) @@ -86,10 +105,10 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) assertTrue(znodeWorkingPath.equals(workingZnode)); } - @Override - public ZooKeeper getNewZooKeeper() throws IOException { - return client; - } +// @Override +// public ZooKeeper getNewZooKeeper() throws IOException { +// return client; +// } public String getVersionNode() { return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; @@ -117,23 +136,24 @@ public void testRetryingCreateRootDir() throws Exception { public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); workingZnode = "/jira/issue/3077/rmstore"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.client = createClient(); this.store = new TestZKRMStateStoreInternal(conf, workingZnode); return this.store; } @Override public boolean isFinalStateValid() throws Exception { - List nodes = client.getChildren(store.znodeWorkingPath, false); - return nodes.size() == 1; + return 1 == + curatorFramework.getChildren().forPath(store.znodeWorkingPath).size(); } @Override public void writeVersion(Version version) throws Exception { - client.setData(store.getVersionNode(), ((VersionPBImpl) version) - .getProto().toByteArray(), -1); + curatorFramework.setData().withVersion(-1) + .forPath(store.getVersionNode(), + ((VersionPBImpl) version).getProto().toByteArray()); } @Override @@ -142,10 +162,8 @@ public Version getCurrentVersion() throws Exception { } public boolean appExists(RMApp app) throws Exception { - Stat node = - client.exists(store.getAppNode(app.getApplicationId().toString()), - false); - return node !=null; + return null != curatorFramework.checkExists() + .forPath(store.getAppNode(app.getApplicationId().toString())); } } @@ -178,9 +196,9 @@ public Version getCurrentVersion() throws Exception { public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); workingZnode = "/jira/issue/3077/rmstore"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.client = createClient(); this.store = new TestZKRMStateStoreInternal(conf, workingZnode) { Version storedVersion = null; @@ -217,7 +235,8 @@ private Configuration createHARMConf( conf.set(YarnConfiguration.RM_HA_IDS, rmIds); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); conf.set(YarnConfiguration.RM_HA_ID, rmId); conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java index 654b357..e270404 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; import javax.crypto.SecretKey; + +import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -73,10 +75,11 @@ private ZKRMStateStore store; private AMRMTokenSecretManager appTokenMgr; private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr; + private TestingServer curatorTestingServer; @Before public void setUpZKServer() throws Exception { - super.setUp(); + curatorTestingServer = new TestingServer(); } @After @@ -87,7 +90,7 @@ public void tearDown() throws Exception { if (appTokenMgr != null) { appTokenMgr.stop(); } - super.tearDown(); + curatorTestingServer.stop(); } private void initStore(String hostPort) { @@ -95,7 +98,8 @@ private void initStore(String hostPort) { RMContext rmContext = mock(RMContext.class); conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort)); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + optHostPort.or(curatorTestingServer.getConnectString())); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); store = new ZKRMStateStore(); @@ -140,7 +144,7 @@ public int run(String[] args) { if (launchLocalZK) { try { - setUp(); + setUpZKServer(); } catch (Exception e) { System.err.println("failed to setup. : " + e.getMessage()); return -1; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java deleted file mode 100644 index 62dc5ef..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.ClientBaseWithFixes; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher; -import org.apache.hadoop.util.ZKUtil; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestZKRMStateStoreZKClientConnections extends - ClientBaseWithFixes { - - private static final int ZK_OP_WAIT_TIME = 3000; - private static final int ZK_TIMEOUT_MS = 1000; - private Log LOG = - LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class); - - private static final String DIGEST_USER_PASS="test-user:test-password"; - private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS; - private static final String DIGEST_USER_HASH; - static { - try { - DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest( - DIGEST_USER_PASS); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } - private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda"; - - - class TestZKClient { - - ZKRMStateStore store; - boolean forExpire = false; - TestForwardingWatcher oldWatcher; - TestForwardingWatcher watcher; - CyclicBarrier syncBarrier = new CyclicBarrier(2); - - protected class TestZKRMStateStore extends ZKRMStateStore { - - public TestZKRMStateStore(Configuration conf, String workingZnode) - throws Exception { - init(conf); - start(); - assertTrue(znodeWorkingPath.equals(workingZnode)); - } - - @Override - public ZooKeeper getNewZooKeeper() - throws IOException, InterruptedException { - oldWatcher = watcher; - watcher = new TestForwardingWatcher(); - return createClient(watcher, hostPort, ZK_TIMEOUT_MS); - } - - @Override - public synchronized void processWatchEvent(ZooKeeper zk, - WatchedEvent event) throws Exception { - - if (forExpire) { - // a hack... couldn't find a way to trigger expired event. - WatchedEvent expriredEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.Expired, null); - super.processWatchEvent(zk, expriredEvent); - forExpire = false; - syncBarrier.await(); - } else { - super.processWatchEvent(zk, event); - } - } - } - - private class TestForwardingWatcher extends - ClientBaseWithFixes.CountdownWatcher { - public void process(WatchedEvent event) { - super.process(event); - try { - if (store != null) { - store.processWatchEvent(client, event); - } - } catch (Throwable t) { - LOG.error("Failed to process watcher event " + event + ": " - + StringUtils.stringifyException(t)); - } - } - } - - public RMStateStore getRMStateStore(Configuration conf) throws Exception { - String workingZnode = "/Test"; - conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); - conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.store = new TestZKRMStateStore(conf, workingZnode); - return this.store; - } - } - - @Test (timeout = 20000) - public void testZKClientRetry() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - final String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - conf.setLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, 100); - final ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); - - stopServer(); - Thread clientThread = new Thread() { - @Override - public void run() { - try { - store.getDataWithRetries(path, true); - } catch (Exception e) { - e.printStackTrace(); - assertionFailedInThread.set(true); - } - } - }; - Thread.sleep(2000); - startServer(); - clientThread.join(); - Assert.assertFalse(assertionFailedInThread.get()); - } - - @Test(timeout = 20000) - public void testZKClientDisconnectAndReconnect() - throws Exception { - - TestZKClient zkClientTester = new TestZKClient(); - String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - - // trigger watch - store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - store.getDataWithRetries(path, true); - store.setDataWithRetries(path, "newBytes".getBytes(), 0); - - stopServer(); - zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME); - try { - store.getDataWithRetries(path, true); - fail("Expected ZKClient time out exception"); - } catch (Exception e) { - assertTrue(e.getMessage().contains( - "Wait for ZKClient creation timed out")); - } - - // ZKRMStateStore Session restored - startServer(); - zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME); - byte[] ret = null; - try { - ret = store.getDataWithRetries(path, true); - } catch (Exception e) { - String error = "ZKRMStateStore Session restore failed"; - LOG.error(error, e); - fail(error); - } - assertEquals("newBytes", new String(ret)); - } - - @Test(timeout = 20000) - public void testZKSessionTimeout() throws Exception { - - TestZKClient zkClientTester = new TestZKClient(); - String path = "/test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - TestDispatcher dispatcher = new TestDispatcher(); - store.setRMDispatcher(dispatcher); - - // a hack to trigger expired event - zkClientTester.forExpire = true; - - // trigger watch - store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - store.getDataWithRetries(path, true); - store.setDataWithRetries(path, "bytes".getBytes(), 0); - - zkClientTester.syncBarrier.await(); - // after this point, expired event has already been processed. - - try { - byte[] ret = store.getDataWithRetries(path, false); - assertEquals("bytes", new String(ret)); - } catch (Exception e) { - String error = "New session creation failed"; - LOG.error(error, e); - fail(error); - } - - // send Disconnected event from old client session to ZKRMStateStore - // check the current client session is not affected. - Assert.assertTrue(zkClientTester.oldWatcher != null); - WatchedEvent disconnectedEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.Disconnected, null); - zkClientTester.oldWatcher.process(disconnectedEvent); - Assert.assertTrue(store.zkClient != null); - - zkClientTester.watcher.process(disconnectedEvent); - Assert.assertTrue(store.zkClient == null); - WatchedEvent connectedEvent = new WatchedEvent( - Watcher.Event.EventType.None, - Watcher.Event.KeeperState.SyncConnected, null); - zkClientTester.watcher.process(connectedEvent); - Assert.assertTrue(store.zkClient != null); - Assert.assertTrue(store.zkClient == store.activeZkClient); - } - - @Test(timeout = 20000) - public void testSetZKAcl() { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca"); - try { - zkClientTester.store.zkClient.delete(zkClientTester.store - .znodeWorkingPath, -1); - fail("Shouldn't be able to delete path"); - } catch (Exception e) {/* expected behavior */ - } - } - - @Test(timeout = 20000) - public void testInvalidZKAclConfiguration() { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*"); - try { - zkClientTester.getRMStateStore(conf); - fail("ZKRMStateStore created with bad ACL"); - } catch (ZKUtil.BadAclFormatException bafe) { - // expected behavior - } catch (Exception e) { - String error = "Incorrect exception on BadAclFormat"; - LOG.error(error, e); - fail(error); - } - } - - @Test - public void testZKAuths() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1); - conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS); - conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL); - conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD); - - zkClientTester.getRMStateStore(conf); - } - - @Test - public void testZKRetryInterval() throws Exception { - TestZKClient zkClientTester = new TestZKClient(); - YarnConfiguration conf = new YarnConfiguration(); - - ZKRMStateStore store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS, - store.zkRetryInterval); - store.stop(); - - conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - store = - (ZKRMStateStore) zkClientTester.getRMStateStore(conf); - assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS / - YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES, - store.zkRetryInterval); - store.stop(); - } -}