diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKManager.java
new file mode 100644
index 00000000000..bcc2f6bab51
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKManager.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.util.curator;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Helper class that provides utility methods specific to ZK operations.
+ */
+@InterfaceAudience.Private
+public final class ZKManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZKManager.class);
+
+ // Configuration keys
+ public static final String ZK_PREFIX = "zk.";
+ public static final String ZK_ACL = ZK_PREFIX + "acl";
+ public static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+ public static final String ZK_AUTH = ZK_PREFIX + "auth";
+
+ public static final String ZK_ADDRESS = "address";
+ public static final String ZK_NUM_RETRIES = "num-retries";
+ public static final int ZK_NUM_RETRIES_DEFAULT = 1000;
+ public static final String ZK_TIMEOUT_MS = "timeout-ms";
+ public static final int ZK_TIMEOUT_MS_DEFAULT = 10000;
+ public static final String ZK_RETRY_INTERVAL_MS = "retry-interval-ms";
+ public static final int ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
+
+ /** Password for the root node in ZooKeeper. */
+ private final String zkRootNodePassword;
+ /** ACL and auth info. */
+ private List zkAcl;
+
+ @VisibleForTesting
+ protected CuratorFramework curatorFramework;
+
+ public ZKManager() throws IOException {
+ zkRootNodePassword =
+ Long.toString(new SecureRandom().nextLong());
+ }
+
+ public String getZkRootNodePassword() {
+ return zkRootNodePassword;
+ }
+
+ public CuratorFramework getCurator() {
+ return curatorFramework;
+ }
+
+ public void close() {
+ if (curatorFramework != null) {
+ curatorFramework.close();
+ }
+ }
+
+ /**
+ * Utility method to fetch the ZK ACLs from the configuration.
+ *
+ * @throws java.io.IOException if the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ public static List getZKAcls(Configuration conf) throws IOException {
+ // Parse authentication from configuration.
+ String zkAclConf = conf.get(ZK_ACL, ZK_ACL_DEFAULT);
+ try {
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+ return ZKUtil.parseACLs(zkAclConf);
+ } catch (IOException | ZKUtil.BadAclFormatException e) {
+ LOG.error("Couldn't read ACLs based on " + ZK_ACL);
+ throw e;
+ }
+ }
+
+ /**
+ * Utility method to fetch ZK auth info from the configuration.
+ *
+ * @throws java.io.IOException if the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ public static List getZKAuths(Configuration conf)
+ throws IOException {
+ String zkAuthConf = conf.get(ZK_AUTH);
+ try {
+ zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+ if (zkAuthConf != null) {
+ return ZKUtil.parseAuth(zkAuthConf);
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException | ZKUtil.BadAuthFormatException e) {
+ LOG.error("Couldn't read Auth based on " + ZK_AUTH);
+ throw e;
+ }
+ }
+
+ public void createAndStartCurator(Configuration conf) throws IOException {
+ this.createAndStartCurator(conf, new ArrayList<>());
+ }
+
+ public void createAndStartCurator(
+ Configuration conf, List authInfos) throws IOException {
+ String zkHostPort = conf.get(ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new IOException(ZK_ADDRESS + " is not configured.");
+ }
+ int numRetries = conf.getInt(ZK_NUM_RETRIES, ZK_NUM_RETRIES_DEFAULT);
+ int zkSessionTimeout = conf.getInt(ZK_TIMEOUT_MS, ZK_TIMEOUT_MS_DEFAULT);
+ int zkRetryInterval =
+ conf.getInt(ZK_RETRY_INTERVAL_MS, ZK_RETRY_INTERVAL_MS_DEFAULT);
+
+ // Set up ZK auths
+ List zkAuths = getZKAuths(conf);
+ if (authInfos == null) {
+ authInfos = new ArrayList<>();
+ }
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkHostPort)
+ .sessionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
+ .authorization(authInfos).build();
+ client.start();
+ this.curatorFramework = client;
+ }
+
+ /**
+ * Get ACLs for a Zn.
+ * @param path Path of the ZNode.
+ * @return
+ * @throws Exception
+ */
+ public List getACL(final String path) throws Exception {
+ return curatorFramework.getACL().forPath(path);
+ }
+
+ /**
+ * Get the data in a ZNode.
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public byte[] getData(final String path) throws Exception {
+ return curatorFramework.getData().forPath(path);
+ }
+
+ /**
+ * Get the data in a ZNode.
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public String getSringData(final String path) throws Exception {
+ byte[] bytes = getData(path);
+ return new String(bytes, Charset.forName("UTF-8"));
+ }
+
+ /**
+ * Set data into a ZNode.
+ * @param path Path of the ZNode.
+ * @param data Data to set.
+ * @param version Version of the data to store.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public void setData(String path, byte[] data, int version) throws Exception {
+ curatorFramework.setData().withVersion(version).forPath(path, data);
+ }
+
+ /**
+ * Set data into a ZNode.
+ * @param path Path of the ZNode.
+ * @param data Data to set as String.
+ * @param version Version of the data to store.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public void setData(String path, String data, int version) throws Exception {
+ byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
+ setData(path, bytes, version);
+ }
+
+ /**
+ * Get children of a ZNode.
+ * @param path Path of the ZNode.
+ * @return The list of children.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public List getChildren(final String path) throws Exception {
+ return curatorFramework.getChildren().forPath(path);
+ }
+
+ /**
+ * Check if a ZNode exists.
+ *
+ * @param path Path of the ZNode.
+ * @return If the ZNode exists.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public boolean exists(final String path) throws Exception {
+ return curatorFramework.checkExists().forPath(path) != null;
+ }
+
+ /**
+ * Create a ZNode.
+ * @param path Path of the ZNode.
+ * @return If the ZNode was created.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public boolean create(final String path) throws Exception {
+ boolean created = false;
+ if (!exists(path)) {
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(path, null);
+ created = true;
+ }
+ return created;
+ }
+
+ /**
+ * Delete a ZNode.
+ * @param path Path of the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ }
+
+ /**
+ * Get the path for a ZNode.
+ * @param root Root of the ZNode.
+ * @param nodeName Name of the ZNode.
+ * @return Path for the ZNode.
+ */
+ public static String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKManager.java
new file mode 100644
index 00000000000..9eb2257ae92
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import static org.junit.Assert.*;
+
+import java.nio.charset.Charset;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestZKManager {
+
+ protected TestingServer server;
+ protected ZKManager zkManager;
+
+ @Before
+ public void setup() throws Exception {
+ this.server = new TestingServer();
+
+ Configuration conf = new Configuration();
+ conf.set(ZKManager.ZK_ADDRESS, this.server.getConnectString());
+
+ this.zkManager = new ZKManager();
+ this.zkManager.createAndStartCurator(conf);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.zkManager.close();
+ if (this.server != null) {
+ this.server.close();
+ this.server = null;
+ }
+ }
+
+ @Test
+ public void testReadWriteData() throws Exception {
+ String testZNode = "/test";
+ String expectedString = "testString";
+ zkManager.create(testZNode);
+ zkManager.setData(testZNode, expectedString, -1);
+ String testString = zkManager.getSringData("/test");
+ assertEquals(expectedString, testString);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 71a71346321..8f78b6b5fa0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.BasicDiskValidator;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.curator.ZKManager;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@Public
@@ -87,7 +88,13 @@ private static void addDeprecatedKeys() {
});
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
- SYSTEM_METRICS_PUBLISHER_ENABLED)
+ SYSTEM_METRICS_PUBLISHER_ENABLED),
+ new DeprecationDelta(RM_ZK_ACL, ZKManager.ZK_ACL),
+ new DeprecationDelta(RM_ZK_AUTH, ZKManager.ZK_AUTH),
+ new DeprecationDelta(RM_ZK_ADDRESS, ZKManager.ZK_ADDRESS),
+ new DeprecationDelta(RM_ZK_NUM_RETRIES, ZKManager.ZK_NUM_RETRIES),
+ new DeprecationDelta(RM_ZK_TIMEOUT_MS, ZKManager.ZK_TIMEOUT_MS),
+ new DeprecationDelta(RM_ZK_RETRY_INTERVAL_MS, ZKManager.ZK_RETRY_INTERVAL_MS),
});
}
@@ -2579,7 +2586,7 @@ public static boolean isAclEnabled(Configuration conf) {
FEDERATION_PREFIX + "state-store.class";
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
- "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
+ "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore";
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
@@ -2618,6 +2625,14 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
+ public static final String FEDERATION_STATESTORE_ZK_PREFIX =
+ FEDERATION_PREFIX + "zk-state-store.";
+ /** Parent znode path under which ZKRMStateStore will create znodes */
+ public static final String FEDERATION_STATESTORE_RM_PARENT_PATH =
+ FEDERATION_STATESTORE_ZK_PREFIX + "parent-path";
+ public static final String DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH =
+ "/federationstore";
+
private static final String FEDERATION_STATESTORE_SQL_PREFIX =
FEDERATION_PREFIX + "state-store.sql.";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 5f8509764e8..2418f3fd116 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -130,6 +130,15 @@
+
+ org.apache.curator
+ curator-framework
+
+
+ org.apache.curator
+ curator-test
+ test
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..5f60914b925
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import static org.apache.hadoop.util.ZKManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ZKManager;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- MEMBERSHIP
+ * | |----- SC1
+ * | |----- SC2
+ * |--- APPLICATION
+ * | |----- APP1
+ * | |----- APP2
+ * |--- POLICY
+ * |----- QUEUE1
+ * |----- QUEUE1
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+ private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+ private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+ private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+ /** Interface to Zookeeper. */
+ private ZKManager zkManager;
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ private String appsZNode;
+ private String membershipZNode;
+ private String policiesZNode;
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ LOG.info("Initializing ZooKeeper connection");
+
+ baseZNode = conf.get(
+ YarnConfiguration.FEDERATION_STATESTORE_RM_PARENT_PATH,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH);
+ try {
+ this.zkManager = new ZKManager();
+ this.zkManager.createAndStartCurator(conf);
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ }
+
+ // Base znodes
+ membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+
+ // Create base znode for each entity
+ try {
+ createRootDirRecursively(membershipZNode);
+ createRootDirRecursively(appsZNode);
+ createRootDirRecursively(policiesZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create base directories: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (zkManager != null) {
+ zkManager.close();
+ }
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+
+ // Try to write the subcluster
+ SubClusterId homeSubCluster = app.getHomeSubCluster();
+ try {
+ putApp(appId, homeSubCluster, false);
+ } catch (Exception e) {
+ String errMsg = "Cannot add application home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ // Check for the actual subcluster
+ try {
+ homeSubCluster = getApp(appId);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(homeSubCluster);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse
+ updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request)
+ throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ SubClusterId newSubClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+ putApp(appId, newSubClusterId, true);
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetApplicationHomeSubClusterResponse.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(appsZNode)) {
+ ApplicationId appId = ApplicationId.fromString(child);
+ SubClusterId homeSubCluster = getApp(appId);
+ ApplicationHomeSubCluster app =
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
+ result.add(app);
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get apps: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return GetApplicationsHomeSubClusterResponse.newInstance(result);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse
+ deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request)
+ throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ boolean exists = false;
+ try {
+ exists = zkManager.exists(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ try {
+ zkManager.delete(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot delete app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterInfo subClusterInfo = request.getSubClusterInfo();
+ SubClusterId subclusterId = subClusterInfo.getSubClusterId();
+
+ // Update the heartbeat time
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+
+ try {
+ putSubclusterInfo(subclusterId, subClusterInfo, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot register subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterState state = request.getState();
+
+ // Get the current information and update it
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ } else {
+ subClusterInfo.setState(state);
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+ }
+
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+ subClusterInfo.setState(request.getState());
+ subClusterInfo.setCapability(request.getCapability());
+
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterInfo subClusterInfo = null;
+ try {
+ subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
+ return null;
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(membershipZNode)) {
+ SubClusterId subClusterId = SubClusterId.newInstance(child);
+ SubClusterInfo info = getSubclusterInfo(subClusterId);
+ if (!request.getFilterInactiveSubClusters() ||
+ info.getState().isActive()) {
+ result.add(info);
+ }
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subclusters: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClustersInfoResponse.newInstance(result);
+ }
+
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ String queue = request.getQueue();
+ SubClusterPolicyConfiguration policy = null;
+ try {
+ policy = getPolicy(queue);
+ } catch (Exception e) {
+ String errMsg = "Cannot get policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ if (policy == null) {
+ LOG.warn("Policy for queue: {} does not exist.", queue);
+ return null;
+ }
+ return GetSubClusterPolicyConfigurationResponse
+ .newInstance(policy);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ SubClusterPolicyConfiguration policy =
+ request.getPolicyConfiguration();
+ try {
+ String queue = policy.getQueue();
+ putPolicy(queue, policy, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot set policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(policiesZNode)) {
+ SubClusterPolicyConfiguration policy = getPolicy(child);
+ result.add(policy);
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get policies: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return null;
+ }
+
+ @Override
+ public Version loadVersion() {
+ return null;
+ }
+
+ /**
+ * Get the subcluster for an application.
+ * @param appId Application identifier.
+ * @return Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterId getApp(final ApplicationId appId) throws YarnException {
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ SubClusterId subClusterId = null;
+ byte[] data = get(appZNode);
+ if (data != null) {
+ try {
+ subClusterId = new SubClusterIdPBImpl(
+ SubClusterIdProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse application at " + appZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return subClusterId;
+ }
+
+ /**
+ * Put an application.
+ * @param appId Application identifier.
+ * @param subClusterId Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putApp(final ApplicationId appId,
+ final SubClusterId subClusterId, boolean update)
+ throws YarnException {
+ String appZNode = getNodePath(appsZNode, appId.toString());
+ SubClusterIdProto proto =
+ ((SubClusterIdPBImpl)subClusterId).getProto();
+ byte[] data = proto.toByteArray();
+ put(appZNode, data, update);
+ }
+
+ /**
+ * Get the current information for a subcluster from Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @return Subcluster information or null if it doesn't exist.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+
+ SubClusterInfo policy = null;
+ byte[] data = get(memberZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterInfoPBImpl(
+ SubClusterInfoProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse subcluster info at " + memberZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @param subClusterInfo Subcluster information.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putSubclusterInfo(final SubClusterId subclusterId,
+ final SubClusterInfo subClusterInfo, final boolean update)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+ SubClusterInfoProto proto =
+ ((SubClusterInfoPBImpl)subClusterInfo).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get the queue policy from Zookeeper.
+ * @param queue Name of the queue.
+ * @return Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private SubClusterPolicyConfiguration getPolicy(final String queue)
+ throws YarnException {
+ String policyZNode = getNodePath(policiesZNode, queue);
+
+ SubClusterPolicyConfiguration policy = null;
+ byte[] data = get(policyZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterPolicyConfigurationPBImpl(
+ SubClusterPolicyConfigurationProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse policy at " + policyZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param queue Name of the queue.
+ * @param policy Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void putPolicy(final String queue,
+ final SubClusterPolicyConfiguration policy, boolean update)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, queue);
+
+ SubClusterPolicyConfigurationProto proto =
+ ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get data from a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @return Data in the znode.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private byte[] get(String znode) throws YarnException {
+ boolean exists = false;
+ try {
+ exists = zkManager.exists(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot find znode " + znode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ LOG.error("{} does not exist", znode);
+ return null;
+ }
+
+ byte[] data = null;
+ try {
+ data = zkManager.getData(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot get data from znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return data;
+ }
+
+ /**
+ * Put data into a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @param data Data to write.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void put(String znode, byte[] data, boolean update)
+ throws YarnException {
+ // Create the znode
+ boolean created = false;
+ try {
+ created = zkManager.create(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!created) {
+ LOG.debug("{} not created", znode);
+ if (!update) {
+ LOG.info("{} already existed and we are not updating", znode);
+ return;
+ }
+ }
+
+ // Write the data into the znode
+ try {
+ zkManager.setData(znode, data, -1);
+ } catch (Exception e) {
+ String errMsg = "Cannot write data into znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+
+ /**
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
+ */
+ private void createRootDirRecursively(final String path) throws Exception {
+ String[] pathParts = path.split("/");
+ Preconditions.checkArgument(
+ pathParts.length >= 1 && pathParts[0].isEmpty(),
+ "Invalid path: %s", path);
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ zkManager.create(sb.toString());
+ }
+ }
+
+ /**
+ * Get the current time.
+ * @return Current time in milliseconds.
+ */
+ private static long getCurrentTime() {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ return cal.getTimeInMillis();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..9617df8d44c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ZookeeperFederationStateStore.
+ */
+public class TestZookeeperFederationStateStore
+ extends FederationStateStoreBaseTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
+
+ /** Zookeeper test server. */
+ private static TestingServer curatorTestingServer;
+ private static CuratorFramework curatorFramework;
+
+ @Before
+ public void before() throws IOException, YarnException {
+ try {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ String connectString = curatorTestingServer.getConnectString();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, connectString);
+ setConf(conf);
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZooKeeper store", e);
+ throw new IOException(e);
+ }
+
+ super.before();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.after();
+
+ curatorFramework.close();
+ try {
+ curatorTestingServer.stop();
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FederationStateStore createStateStore() {
+ Configuration conf = new Configuration();
+ super.setConf(conf);
+ return new ZookeeperFederationStateStore();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index a8dcda4f797..f828e55ab3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -96,8 +97,8 @@ protected void serviceInit(Configuration conf)
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- List zkAcls = RMZKUtils.getZKAcls(conf);
- List zkAuths = RMZKUtils.getZKAuths(conf);
+ List zkAcls = ZKManager.getZKAcls(conf);
+ List zkAuths = ZKManager.getZKAuths(conf);
int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
deleted file mode 100644
index 4b8561dae15..00000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.data.ACL;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Helper class that provides utility methods specific to ZK operations
- */
-@InterfaceAudience.Private
-public class RMZKUtils {
- private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
-
- /**
- * Utility method to fetch the ZK ACLs from the configuration.
- *
- * @throws java.io.IOException if the Zookeeper ACLs configuration file
- * cannot be read
- */
- public static List getZKAcls(Configuration conf) throws IOException {
- // Parse authentication from configuration.
- String zkAclConf =
- conf.get(YarnConfiguration.RM_ZK_ACL,
- YarnConfiguration.DEFAULT_RM_ZK_ACL);
- try {
- zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
- return ZKUtil.parseACLs(zkAclConf);
- } catch (IOException | ZKUtil.BadAclFormatException e) {
- LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
- throw e;
- }
- }
-
- /**
- * Utility method to fetch ZK auth info from the configuration.
- *
- * @throws java.io.IOException if the Zookeeper ACLs configuration file
- * cannot be read
- */
- public static List getZKAuths(Configuration conf)
- throws IOException {
- String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
- try {
- zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
- if (zkAuthConf != null) {
- return ZKUtil.parseAuth(zkAuthConf);
- } else {
- return Collections.emptyList();
- }
- } catch (IOException | ZKUtil.BadAuthFormatException e) {
- LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
- throw e;
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 969188529e8..eb2261e1ac2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKManager;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -192,9 +193,7 @@
protected ResourceTrackerService resourceTracker;
private JvmMetrics jvmMetrics;
private boolean curatorEnabled = false;
- private CuratorFramework curator;
- private final String zkRootNodePassword =
- Long.toString(new SecureRandom().nextLong());
+ private ZKManager zkManager;
private boolean recoveryEnabled;
@VisibleForTesting
@@ -345,7 +344,23 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
- this.curator = createAndStartCurator(conf);
+ this.zkManager = new ZKManager();
+
+ //Additional ZK auts
+ List authInfos = new ArrayList<>();
+ if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
+ String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+ String zkRootNodePassword = this.zkManager.getZkRootNodePassword();
+ byte[] defaultFencingAuth =
+ (zkRootNodeUsername + ":" + zkRootNodePassword)
+ .getBytes(Charset.forName("UTF-8"));
+ authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
+ defaultFencingAuth));
+ }
+ this.zkManager.createAndStartCurator(conf, authInfos);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -353,57 +368,14 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
return elector;
}
- public CuratorFramework createAndStartCurator(Configuration conf)
- throws IOException {
- String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
- if (zkHostPort == null) {
- throw new YarnRuntimeException(
- YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
- }
- int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
- int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
-
- // set up zk auths
- List zkAuths = RMZKUtils.getZKAuths(conf);
- List authInfos = new ArrayList<>();
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
- }
-
- if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
- YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
- String zkRootNodeUsername = HAUtil
- .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
- byte[] defaultFencingAuth =
- (zkRootNodeUsername + ":" + zkRootNodePassword)
- .getBytes(Charset.forName("UTF-8"));
- authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
- defaultFencingAuth));
- }
-
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(zkHostPort)
- .sessionTimeoutMs(zkSessionTimeout)
- .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
- .authorization(authInfos).build();
- client.start();
- return client;
- }
-
public CuratorFramework getCurator() {
- return this.curator;
+ return this.zkManager.getCurator();
}
- public String getZkRootNodePassword() {
- return this.zkRootNodePassword;
+ public ZKManager getZKManager() {
+ return this.zkManager;
}
-
protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
Configuration conf) {
return new QueueACLsManager(scheduler, conf);
@@ -1264,8 +1236,8 @@ protected void serviceStop() throws Exception {
configurationProvider.close();
}
super.serviceStop();
- if (curator != null) {
- curator.close();
+ if (zkManager != null) {
+ zkManager.close();
}
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1b3b367bb6c..7e4d1e226dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -46,7 +47,6 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -64,6 +64,8 @@
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import static org.apache.hadoop.util.curator.ZKManager.getNodePath;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -202,7 +204,7 @@
new DigestAuthenticationProvider().getScheme();
@VisibleForTesting
- protected CuratorFramework curatorFramework;
+ protected ZKManager zkManager;
/*
* Indicates different app attempt state store operations.
@@ -258,7 +260,7 @@
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":"
- + resourceManager.getZkRootNodePassword()));
+ + zkManager.getZkRootNodePassword()));
zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAclList;
@@ -298,7 +300,7 @@ public synchronized void initInternal(Configuration conf)
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
}
- zkAcl = RMZKUtils.getZKAcls(conf);
+ zkAcl = ZKManager.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
@@ -330,10 +332,10 @@ public synchronized void initInternal(Configuration conf)
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
- curatorFramework = resourceManager.getCurator();
-
- if (curatorFramework == null) {
- curatorFramework = resourceManager.createAndStartCurator(conf);
+ zkManager = resourceManager.getZKManager();
+ if (zkManager == null) {
+ zkManager = new ZKManager();
+ zkManager.createAndStartCurator(conf);
}
}
@@ -341,30 +343,30 @@ public synchronized void initInternal(Configuration conf)
public synchronized void startInternal() throws Exception {
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
- create(zkRootNodePath);
+ zkManager.create(zkRootNodePath);
setRootNodeAcls();
- delete(fencingNodePath);
+ zkManager.delete(fencingNodePath);
if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
.isAutomaticFailoverEnabled(getConfig())) {
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
- create(rmAppRoot);
- create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
+ zkManager.create(rmAppRoot);
+ zkManager.create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
- create(rmAppRootHierarchies.get(splitIndex));
+ zkManager.create(rmAppRootHierarchies.get(splitIndex));
}
- create(rmDTSecretManagerRoot);
- create(dtMasterKeysRootPath);
- create(delegationTokensRootPath);
- create(dtSequenceNumberPath);
- create(amrmTokenSecretManagerRoot);
- create(reservationRoot);
+ zkManager.create(rmDTSecretManagerRoot);
+ zkManager.create(dtMasterKeysRootPath);
+ zkManager.create(delegationTokensRootPath);
+ zkManager.create(dtSequenceNumberPath);
+ zkManager.create(amrmTokenSecretManagerRoot);
+ zkManager.create(reservationRoot);
}
private void logRootNodeAcls(String prefix) throws Exception {
Stat getStat = new Stat();
- List getAcls = getACL(zkRootNodePath);
+ List getAcls = zkManager.getACL(zkRootNodePath);
StringBuilder builder = new StringBuilder();
builder.append(prefix);
@@ -382,6 +384,7 @@ private void setRootNodeAcls() throws Exception {
logRootNodeAcls("Before setting ACLs'\n");
}
+ CuratorFramework curatorFramework = zkManager.getCurator();
if (HAUtil.isHAEnabled(getConfig())) {
curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
} else {
@@ -401,7 +404,7 @@ protected synchronized void closeInternal() throws Exception {
}
if (!HAUtil.isHAEnabled(getConfig())) {
- IOUtils.closeStream(curatorFramework);
+ IOUtils.closeStream(zkManager.getCurator());
}
}
@@ -416,7 +419,7 @@ protected synchronized void storeVersion() throws Exception {
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
- if (exists(versionNodePath)) {
+ if (zkManager.exists(versionNodePath)) {
safeSetData(versionNodePath, data, -1);
} else {
safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
@@ -427,8 +430,8 @@ protected synchronized void storeVersion() throws Exception {
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
- if (exists(versionNodePath)) {
- byte[] data = getData(versionNodePath);
+ if (zkManager.exists(versionNodePath)) {
+ byte[] data = zkManager.getData(versionNodePath);
return new VersionPBImpl(VersionProto.parseFrom(data));
}
@@ -440,9 +443,9 @@ public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = baseEpoch;
- if (exists(epochNodePath)) {
+ if (zkManager.exists(epochNodePath)) {
// load current epoch
- byte[] data = getData(epochNodePath);
+ byte[] data = zkManager.getData(epochNodePath);
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
@@ -475,7 +478,7 @@ public synchronized RMState loadState() throws Exception {
}
private void loadReservationSystemState(RMState rmState) throws Exception {
- List planNodes = getChildren(reservationRoot);
+ List planNodes = zkManager.getChildren(reservationRoot);
for (String planName : planNodes) {
if (LOG.isDebugEnabled()) {
@@ -483,7 +486,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception {
}
String planNodePath = getNodePath(reservationRoot, planName);
- List reservationNodes = getChildren(planNodePath);
+ List reservationNodes = zkManager.getChildren(planNodePath);
for (String reservationNodeName : reservationNodes) {
String reservationNodePath =
@@ -493,7 +496,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception {
LOG.debug("Loading reservation from znode: " + reservationNodePath);
}
- byte[] reservationData = getData(reservationNodePath);
+ byte[] reservationData = zkManager.getData(reservationNodePath);
ReservationAllocationStateProto allocationState =
ReservationAllocationStateProto.parseFrom(reservationData);
@@ -511,7 +514,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception {
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
- byte[] data = getData(amrmTokenSecretManagerRoot);
+ byte[] data = zkManager.getData(amrmTokenSecretManagerRoot);
if (data == null) {
LOG.warn("There is no data saved");
@@ -533,11 +536,11 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState)
}
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
- List childNodes = getChildren(dtMasterKeysRootPath);
+ List childNodes = zkManager.getChildren(dtMasterKeysRootPath);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
- byte[] childData = getData(childNodePath);
+ byte[] childData = zkManager.getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -562,7 +565,7 @@ private void loadRMDelegationKeyState(RMState rmState) throws Exception {
}
private void loadRMSequentialNumberState(RMState rmState) throws Exception {
- byte[] seqData = getData(dtSequenceNumberPath);
+ byte[] seqData = zkManager.getData(dtSequenceNumberPath);
if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
@@ -575,12 +578,12 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception {
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List childNodes =
- getChildren(delegationTokensRootPath);
+ zkManager.getChildren(delegationTokensRootPath);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
- byte[] childData = getData(childNodePath);
+ byte[] childData = zkManager.getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -611,7 +614,7 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
String appIdStr) throws Exception {
- byte[] appData = getData(appNodePath);
+ byte[] appData = zkManager.getData(appNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + appNodePath);
}
@@ -633,7 +636,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
if (appRoot == null) {
continue;
}
- List childNodes = getChildren(appRoot);
+ List childNodes = zkManager.getChildren(appRoot);
boolean appNodeFound = false;
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@@ -644,7 +647,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
} else {
// If AppId Node is partitioned.
String parentNodePath = getNodePath(appRoot, childNodeName);
- List leafNodes = getChildren(parentNodePath);
+ List leafNodes = zkManager.getChildren(parentNodePath);
for (String leafNodeName : leafNodes) {
String appIdStr = childNodeName + leafNodeName;
loadRMAppStateFromAppNode(rmState,
@@ -667,12 +670,12 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
private void loadApplicationAttemptState(ApplicationStateData appState,
String appPath) throws Exception {
- List attempts = getChildren(appPath);
+ List attempts = zkManager.getChildren(appPath);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
- byte[] attemptData = getData(attemptPath);
+ byte[] attemptData = zkManager.getData(attemptPath);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
@@ -709,7 +712,7 @@ private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
List children = null;
try {
- children = getChildren(parentAppNode);
+ children = zkManager.getChildren(parentAppNode);
} catch (KeeperException.NoNodeException ke) {
// It should be fine to swallow this exception as the parent app node we
// intend to delete is already deleted.
@@ -770,7 +773,7 @@ protected synchronized void updateApplicationStateInternal(
boolean pathExists = true;
// Look for paths based on other split indices if path as per split index
// does not exist.
- if (!exists(nodeUpdatePath)) {
+ if (!zkManager.exists(nodeUpdatePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
@@ -780,7 +783,7 @@ protected synchronized void updateApplicationStateInternal(
if (appIdNodeSplitIndex != 0) {
String rootNode =
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
- if (!exists(rootNode)) {
+ if (!zkManager.exists(rootNode)) {
safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
}
}
@@ -816,7 +819,7 @@ private void handleApplicationAttemptStateOp(
String appId = appAttemptId.getApplicationId().toString();
String appDirPath = getLeafAppIdNodePath(appId, false);
// Look for paths based on other split indices.
- if (!exists(appDirPath)) {
+ if (!zkManager.exists(appDirPath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
if (alternatePathInfo == null) {
if (operation == AppAttemptOp.REMOVE) {
@@ -839,7 +842,7 @@ private void handleApplicationAttemptStateOp(
}
switch (operation) {
case UPDATE:
- if (exists(path)) {
+ if (zkManager.exists(path)) {
safeSetData(path, attemptStateData, -1);
} else {
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
@@ -912,7 +915,7 @@ private void removeApp(String removeAppId, boolean safeRemove,
int splitIndex = appIdNodeSplitIndex;
// Look for paths based on other split indices if path as per configured
// split index does not exist.
- if (!exists(appIdRemovePath)) {
+ if (!zkManager.exists(appIdRemovePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
@@ -936,7 +939,7 @@ private void removeApp(String removeAppId, boolean safeRemove,
}
safeDelete(appIdRemovePath);
} else {
- curatorFramework.delete().deletingChildrenIfNeeded().
+ zkManager.getCurator().delete().deletingChildrenIfNeeded().
forPath(appIdRemovePath);
}
// Check if we should remove the parent app node as well.
@@ -976,7 +979,7 @@ protected synchronized void updateRMDelegationTokenState(
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
- if (exists(nodeRemovePath)) {
+ if (zkManager.exists(nodeRemovePath)) {
// in case znode exists
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
} else {
@@ -1056,7 +1059,7 @@ protected synchronized void removeRMDTMasterKeyState(
@Override
public synchronized void deleteStore() throws Exception {
- delete(zkRootNodePath);
+ zkManager.delete(zkRootNodePath);
}
@Override
@@ -1065,11 +1068,6 @@ public synchronized void removeApplication(ApplicationId removeAppId)
removeApp(removeAppId.toString());
}
- @VisibleForTesting
- String getNodePath(String root, String nodeName) {
- return (root + "/" + nodeName);
- }
-
@Override
protected synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
@@ -1094,7 +1092,7 @@ protected synchronized void removeReservationState(String planName,
safeDelete(reservationPath);
- List reservationNodes = getChildren(planNodePath);
+ List reservationNodes = zkManager.getChildren(planNodePath);
if (reservationNodes.isEmpty()) {
safeDelete(planNodePath);
@@ -1121,7 +1119,7 @@ private void addOrUpdateReservationState(
reservationIdName);
byte[] reservationData = reservationAllocation.toByteArray();
- if (!exists(planCreatePath)) {
+ if (!zkManager.exists(planCreatePath)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath);
}
@@ -1157,7 +1155,7 @@ private void createRootDirRecursively(String path) throws Exception {
for (int i = 1; i < pathParts.length; i++) {
sb.append("/").append(pathParts[i]);
- create(sb.toString());
+ zkManager.create(sb.toString());
}
}
@@ -1176,7 +1174,7 @@ private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
- if (exists(alternatePath)) {
+ if (zkManager.exists(alternatePath)) {
return new AppNodeSplitInfo(alternatePath, splitIndex);
}
}
@@ -1205,7 +1203,7 @@ private String getLeafAppIdNodePath(String appId, String rootNode,
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
String rootNodePath =
getNodePath(rootNode, nodeName.substring(0, splitIdx));
- if (createParentIfNotExists && !exists(rootNodePath)) {
+ if (createParentIfNotExists && !zkManager.exists(rootNodePath)) {
try {
safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
@@ -1234,45 +1232,9 @@ private String getLeafAppIdNodePath(String appId,
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
}
- @VisibleForTesting
- byte[] getData(final String path) throws Exception {
- return curatorFramework.getData().forPath(path);
- }
-
- @VisibleForTesting
- List getACL(final String path) throws Exception {
- return curatorFramework.getACL().forPath(path);
- }
-
- @VisibleForTesting
- List getChildren(final String path) throws Exception {
- return curatorFramework.getChildren().forPath(path);
- }
-
- @VisibleForTesting
- boolean exists(final String path) throws Exception {
- return curatorFramework.checkExists().forPath(path) != null;
- }
-
- @VisibleForTesting
- void create(final String path) throws Exception {
- if (!exists(path)) {
- curatorFramework.create()
- .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
- .forPath(path, null);
- }
- }
-
- @VisibleForTesting
- void delete(final String path) throws Exception {
- if (exists(path)) {
- curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
- }
- }
-
private void safeCreate(String path, byte[] data, List acl,
CreateMode mode) throws Exception {
- if (!exists(path)) {
+ if (!zkManager.exists(path)) {
SafeTransaction transaction = new SafeTransaction();
transaction.create(path, data, acl, mode);
transaction.commit();
@@ -1285,7 +1247,7 @@ private void safeCreate(String path, byte[] data, List acl,
* @throws Exception if any problem occurs while performing deletion.
*/
private void safeDelete(final String path) throws Exception {
- if (exists(path)) {
+ if (zkManager.exists(path)) {
SafeTransaction transaction = new SafeTransaction();
transaction.delete(path);
transaction.commit();
@@ -1310,7 +1272,7 @@ private void safeSetData(String path, byte[] data, int version)
private CuratorTransactionFinal transactionFinal;
SafeTransaction() throws Exception {
- CuratorTransaction transaction = curatorFramework.inTransaction();
+ CuratorTransaction transaction = zkManager.getCurator().inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(zkAcl)
.forPath(fencingNodePath, new byte[0]).and();
@@ -1364,4 +1326,34 @@ public void run() {
}
}
}
+
+ @VisibleForTesting
+ protected List getACL(String path) throws Exception {
+ return zkManager.getACL(path);
+ }
+
+ @VisibleForTesting
+ protected void create(String path) throws Exception {
+ zkManager.create(path);
+ }
+
+ @VisibleForTesting
+ protected boolean exists(String path) throws Exception {
+ return zkManager.exists(path);
+ }
+
+ @VisibleForTesting
+ protected List getChildren(String path) throws Exception {
+ return zkManager.getChildren(path);
+ }
+
+ @VisibleForTesting
+ protected byte[] getData(String path) throws Exception {
+ return this.zkManager.getData(path);
+ }
+
+ @VisibleForTesting
+ protected void delete(String path) throws Exception {
+ this.zkManager.delete(path);
+ }
}