diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cd4d569a77a..195adada541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2590,7 +2590,7 @@ public static boolean isAclEnabled(Configuration conf) {
FEDERATION_PREFIX + "state-store.class";
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
- "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
+ "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore";
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
@@ -2629,6 +2629,14 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
+ public static final String FEDERATION_STATESTORE_ZK_PREFIX =
+ FEDERATION_PREFIX + "zk-state-store.";
+ /** Parent znode path under which ZKRMStateStore will create znodes */
+ public static final String FEDERATION_STATESTORE_RM_PARENT_PATH =
+ FEDERATION_STATESTORE_ZK_PREFIX + "parent-path";
+ public static final String DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH =
+ "/federationstore";
+
private static final String FEDERATION_STATESTORE_SQL_PREFIX =
FEDERATION_PREFIX + "state-store.sql.";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..15333ef3df8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- MEMBERSHIP
+ * | |----- SC1
+ * | |----- SC2
+ * |--- APPLICATION
+ * | |----- APP1
+ * | |----- APP2
+ * |--- POLICY
+ * |----- QUEUE1
+ * |----- QUEUE1
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+ private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+ private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+ private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+ /** Interface to Zookeeper. */
+ private ZKCuratorManager zkManager;
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ private String appsZNode;
+ private String membershipZNode;
+ private String policiesZNode;
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ LOG.info("Initializing ZooKeeper connection");
+
+ baseZNode = conf.get(
+ YarnConfiguration.FEDERATION_STATESTORE_RM_PARENT_PATH,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH);
+ try {
+ this.zkManager = new ZKCuratorManager(conf);
+ this.zkManager.start();
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ }
+
+ // Base znodes
+ membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+
+ // Create base znode for each entity
+ try {
+ createRootDirRecursively(membershipZNode);
+ createRootDirRecursively(appsZNode);
+ createRootDirRecursively(policiesZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create base directories: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (zkManager != null) {
+ zkManager.close();
+ }
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+
+ // Try to write the subcluster
+ SubClusterId homeSubCluster = app.getHomeSubCluster();
+ try {
+ putApp(appId, homeSubCluster, false);
+ } catch (Exception e) {
+ String errMsg = "Cannot add application home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ // Check for the actual subcluster
+ try {
+ homeSubCluster = getApp(appId);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(homeSubCluster);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse
+ updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request)
+ throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ SubClusterId newSubClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+ putApp(appId, newSubClusterId, true);
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetApplicationHomeSubClusterResponse.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(appsZNode)) {
+ ApplicationId appId = ApplicationId.fromString(child);
+ SubClusterId homeSubCluster = getApp(appId);
+ ApplicationHomeSubCluster app =
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
+ result.add(app);
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get apps: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return GetApplicationsHomeSubClusterResponse.newInstance(result);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse
+ deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request)
+ throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ boolean exists = false;
+ try {
+ exists = zkManager.exists(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ try {
+ zkManager.delete(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot delete app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterInfo subClusterInfo = request.getSubClusterInfo();
+ SubClusterId subclusterId = subClusterInfo.getSubClusterId();
+
+ // Update the heartbeat time
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+
+ try {
+ putSubclusterInfo(subclusterId, subClusterInfo, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot register subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterState state = request.getState();
+
+ // Get the current information and update it
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ } else {
+ subClusterInfo.setState(state);
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+ }
+
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+ subClusterInfo.setState(request.getState());
+ subClusterInfo.setCapability(request.getCapability());
+
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterInfo subClusterInfo = null;
+ try {
+ subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
+ return null;
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(membershipZNode)) {
+ SubClusterId subClusterId = SubClusterId.newInstance(child);
+ SubClusterInfo info = getSubclusterInfo(subClusterId);
+ if (!request.getFilterInactiveSubClusters() ||
+ info.getState().isActive()) {
+ result.add(info);
+ }
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subclusters: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClustersInfoResponse.newInstance(result);
+ }
+
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ String queue = request.getQueue();
+ SubClusterPolicyConfiguration policy = null;
+ try {
+ policy = getPolicy(queue);
+ } catch (Exception e) {
+ String errMsg = "Cannot get policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ if (policy == null) {
+ LOG.warn("Policy for queue: {} does not exist.", queue);
+ return null;
+ }
+ return GetSubClusterPolicyConfigurationResponse
+ .newInstance(policy);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ SubClusterPolicyConfiguration policy =
+ request.getPolicyConfiguration();
+ try {
+ String queue = policy.getQueue();
+ putPolicy(queue, policy, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot set policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ try {
+ for (String child : zkManager.getChildren(policiesZNode)) {
+ SubClusterPolicyConfiguration policy = getPolicy(child);
+ result.add(policy);
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get policies: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return null;
+ }
+
+ @Override
+ public Version loadVersion() {
+ return null;
+ }
+
+ /**
+ * Get the subcluster for an application.
+ * @param appId Application identifier.
+ * @return Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterId getApp(final ApplicationId appId) throws YarnException {
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ SubClusterId subClusterId = null;
+ byte[] data = get(appZNode);
+ if (data != null) {
+ try {
+ subClusterId = new SubClusterIdPBImpl(
+ SubClusterIdProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse application at " + appZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return subClusterId;
+ }
+
+ /**
+ * Put an application.
+ * @param appId Application identifier.
+ * @param subClusterId Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putApp(final ApplicationId appId,
+ final SubClusterId subClusterId, boolean update)
+ throws YarnException {
+ String appZNode = getNodePath(appsZNode, appId.toString());
+ SubClusterIdProto proto =
+ ((SubClusterIdPBImpl)subClusterId).getProto();
+ byte[] data = proto.toByteArray();
+ put(appZNode, data, update);
+ }
+
+ /**
+ * Get the current information for a subcluster from Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @return Subcluster information or null if it doesn't exist.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+
+ SubClusterInfo policy = null;
+ byte[] data = get(memberZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterInfoPBImpl(
+ SubClusterInfoProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse subcluster info at " + memberZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @param subClusterInfo Subcluster information.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putSubclusterInfo(final SubClusterId subclusterId,
+ final SubClusterInfo subClusterInfo, final boolean update)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+ SubClusterInfoProto proto =
+ ((SubClusterInfoPBImpl)subClusterInfo).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get the queue policy from Zookeeper.
+ * @param queue Name of the queue.
+ * @return Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private SubClusterPolicyConfiguration getPolicy(final String queue)
+ throws YarnException {
+ String policyZNode = getNodePath(policiesZNode, queue);
+
+ SubClusterPolicyConfiguration policy = null;
+ byte[] data = get(policyZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterPolicyConfigurationPBImpl(
+ SubClusterPolicyConfigurationProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse policy at " + policyZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param queue Name of the queue.
+ * @param policy Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void putPolicy(final String queue,
+ final SubClusterPolicyConfiguration policy, boolean update)
+ throws YarnException {
+ String memberZNode = getNodePath(membershipZNode, queue);
+
+ SubClusterPolicyConfigurationProto proto =
+ ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get data from a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @return Data in the znode.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private byte[] get(String znode) throws YarnException {
+ boolean exists = false;
+ try {
+ exists = zkManager.exists(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot find znode " + znode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ LOG.error("{} does not exist", znode);
+ return null;
+ }
+
+ byte[] data = null;
+ try {
+ data = zkManager.getData(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot get data from znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return data;
+ }
+
+ /**
+ * Put data into a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @param data Data to write.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void put(String znode, byte[] data, boolean update)
+ throws YarnException {
+ // Create the znode
+ boolean created = false;
+ try {
+ created = zkManager.create(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!created) {
+ LOG.debug("{} not created", znode);
+ if (!update) {
+ LOG.info("{} already existed and we are not updating", znode);
+ return;
+ }
+ }
+
+ // Write the data into the znode
+ try {
+ zkManager.setData(znode, data, -1);
+ } catch (Exception e) {
+ String errMsg = "Cannot write data into znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+
+ /**
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
+ */
+ private void createRootDirRecursively(final String path) throws Exception {
+ String[] pathParts = path.split("/");
+ Preconditions.checkArgument(
+ pathParts.length >= 1 && pathParts[0].isEmpty(),
+ "Invalid path: %s", path);
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ zkManager.create(sb.toString());
+ }
+ }
+
+ /**
+ * Get the current time.
+ * @return Current time in milliseconds.
+ */
+ private static long getCurrentTime() {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ return cal.getTimeInMillis();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..390b8037b16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ZookeeperFederationStateStore.
+ */
+public class TestZookeeperFederationStateStore
+ extends FederationStateStoreBaseTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
+
+ /** Zookeeper test server. */
+ private static TestingServer curatorTestingServer;
+ private static CuratorFramework curatorFramework;
+
+ @Before
+ public void before() throws IOException, YarnException {
+ try {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ String connectString = curatorTestingServer.getConnectString();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+
+ Configuration conf = new YarnConfiguration();
+ conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+ setConf(conf);
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZooKeeper store", e);
+ throw new IOException(e);
+ }
+
+ super.before();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.after();
+
+ curatorFramework.close();
+ try {
+ curatorTestingServer.stop();
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FederationStateStore createStateStore() {
+ Configuration conf = new Configuration();
+ super.setConf(conf);
+ return new ZookeeperFederationStateStore();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index ecf61c5d417..8f2237f392d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -160,11 +160,7 @@ These are common configurations that should appear in the **conf/yarn-site.xml**
| Property | Example | Description |
|:---- |:---- |
|`yarn.federation.enabled` | `true` | Whether federation is enabled or not |
-|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore` | The type of state-store to use. |
-|`yarn.federation.state-store.sql.url` | `jdbc:mysql://:/FederationStateStore` | For SQLFederationStateStore the name of the DB where the state is stored. |
-|`yarn.federation.state-store.sql.jdbc-class` | `com.mysql.jdbc.jdbc2.optional.MysqlDataSource` | For SQLFederationStateStore the jdbc class to use. |
-|`yarn.federation.state-store.sql.username` | `` | For SQLFederationStateStore the username for the DB connection. |
-|`yarn.federation.state-store.sql.password` | `` | For SQLFederationStateStore the password for the DB connection. |
+|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. |
|`yarn.resourcemanager.cluster-id` | `` | The unique subcluster identifier for this RM (same as the one used for HA). |
@@ -238,7 +234,21 @@ Optional:
###State-Store:
-Currently, we support only SQL based implementation of state-store (ZooKeeper is in the works), i.e. either MySQL or Microsoft SQL Server.
+Currently, we support ZooKeeper and SQL based implementations of the state-store.
+
+For ZooKeeper, one must set the ZooKeeper settings for Hadoop:
+| Property | Example | Description |
+|:---- |:---- |
+|`hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |
+
+For SQL (i.e., either MySQL or Microsoft SQL Server), one must setup the following parameters:
+| Property | Example | Description |
+|:---- |:---- |
+|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore` | The type of state-store to use. |
+|`yarn.federation.state-store.sql.url` | `jdbc:mysql://:/FederationStateStore` | For SQLFederationStateStore the name of the DB where the state is stored. |
+|`yarn.federation.state-store.sql.jdbc-class` | `com.mysql.jdbc.jdbc2.optional.MysqlDataSource` | For SQLFederationStateStore the jdbc class to use. |
+|`yarn.federation.state-store.sql.username` | `` | For SQLFederationStateStore the username for the DB connection. |
+|`yarn.federation.state-store.sql.password` | `` | For SQLFederationStateStore the password for the DB connection. |
For MySQL, one must download the latest jar version 5.x from [MVN Repository](https://mvnrepository.com/artifact/mysql/mysql-connector-java) and add it to the CLASSPATH.
Then the DB schema is created by executing the following SQL scripts in the database: