diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..a2a8743fe4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -0,0 +1,807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ // ZooKeeper settings
+ private int zkSessionTimeout = 15000;
+ private int zkNumRetries = 1000;
+ private int zkRetryInterval = 1000;
+ private List zkAcl;
+ private List zkAuths;
+
+ private CuratorFramework curatorFramework;
+
+ private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+ private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+ private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ LOG.info("Initializing ZooKeeper connection");
+
+ baseZNode = conf.get(
+ YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
+ try {
+ curatorFramework = createAndStartCurator(conf);
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ }
+
+ // Create base znode for each entity
+ try {
+ createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP));
+ createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION));
+ createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY));
+ } catch (Exception e) {
+ String errMsg = "Cannot create base directories: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+
+ /**
+ * Create and start the ZooKeeper Curator.
+ *
+ * @param conf Configuration for the curator.
+ * @return Curator framework.
+ * @throws IOException If the initialization failed.
+ */
+ private CuratorFramework createAndStartCurator(Configuration conf)
+ throws IOException {
+
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new RuntimeException(
+ YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
+ }
+
+ // Setup ZK auths
+ this.zkAuths = getZKAuths(conf);
+ List authInfos = new ArrayList<>();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+
+ this.zkAcl = getZKAcls(conf);
+
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkHostPort)
+ .sessionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(new RetryNTimes(zkNumRetries, zkRetryInterval))
+ .authorization(authInfos).build();
+ client.start();
+ return client;
+ }
+
+ /**
+ * Get the ZK authorization from the configuration.
+ * @param conf Configuration.
+ * @return List of ZK authorizations.
+ * @throws IOException If the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ private List getZKAuths(Configuration conf)
+ throws IOException {
+ try {
+ String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
+ zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+ if (zkAuthConf != null) {
+ return ZKUtil.parseAuth(zkAuthConf);
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException | ZKUtil.BadAuthFormatException e) {
+ LOG.error("Couldn't read Auth based on " +
+ YarnConfiguration.RM_ZK_AUTH);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the ZK ACLs from the configuration.
+ * @param conf Configuration.
+ * @return List of ACLs.
+ * @throws IOException
+ */
+ private static List getZKAcls(Configuration conf) throws IOException {
+ // Parse authentication from configuration.
+ String zkAclConf = conf.get(
+ YarnConfiguration.RM_ZK_ACL,
+ YarnConfiguration.DEFAULT_RM_ZK_ACL);
+ try {
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+ return ZKUtil.parseACLs(zkAclConf);
+ } catch (IOException | ZKUtil.BadAclFormatException e) {
+ LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (curatorFramework != null) {
+ curatorFramework.close();
+ }
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+
+ // Try to write the subcluster
+ SubClusterId homeSubCluster = app.getHomeSubCluster();
+ try {
+ putApp(appId, homeSubCluster, false);
+ } catch (Exception e) {
+ String errMsg = "Cannot add application home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ // Check for the actual subcluster
+ try {
+ homeSubCluster = getApp(appId);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(homeSubCluster);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ SubClusterId newSubClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+ putApp(appId, newSubClusterId, true);
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetApplicationHomeSubClusterResponse.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ try {
+ for (String child : getChildren(appsZNode)) {
+ ApplicationId appId = ApplicationId.fromString(child);
+ SubClusterId homeSubCluster = getApp(appId);
+ result.add(ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get apps: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return GetApplicationsHomeSubClusterResponse.newInstance(result);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ boolean exists = false;
+ try {
+ exists = exists(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ try {
+ delete(appZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot delete app: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterInfo subClusterInfo = request.getSubClusterInfo();
+ SubClusterId subclusterId = subClusterInfo.getSubClusterId();
+
+ // Update the heartbeat time
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+
+ try {
+ putSubclusterInfo(subclusterId, subClusterInfo, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot register subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest request) throws YarnException {
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterState state = request.getState();
+
+ // Get the current information and update it
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ } else {
+ subClusterInfo.setState(state);
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+ }
+
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+
+ SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ String errMsg = "SubCluster " + subClusterId
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ long currentTime = getCurrentTime();
+ subClusterInfo.setLastHeartBeat(currentTime);
+ subClusterInfo.setState(request.getState());
+ subClusterInfo.setCapability(request.getCapability());
+
+ putSubclusterInfo(subClusterId, subClusterInfo, true);
+
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest request) throws YarnException {
+
+ FederationMembershipStateStoreInputValidator.validate(request);
+ SubClusterId subClusterId = request.getSubClusterId();
+ SubClusterInfo subClusterInfo = null;
+ try {
+ subClusterInfo = getSubclusterInfo(subClusterId);
+ if (subClusterInfo == null) {
+ LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
+ return null;
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subcluster: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ try {
+ for (String child : getChildren(membershipZNode)) {
+ SubClusterId subClusterId = SubClusterId.newInstance(child);
+ SubClusterInfo info = getSubclusterInfo(subClusterId);
+ if (!request.getFilterInactiveSubClusters() ||
+ info.getState().isActive()) {
+ result.add(info);
+ }
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get subclusters: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClustersInfoResponse.newInstance(result);
+ }
+
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ String queue = request.getQueue();
+ SubClusterPolicyConfiguration policy = null;;
+ try {
+ policy = getPolicy(queue);
+ } catch (Exception e) {
+ String errMsg = "Cannot get policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ if (policy == null) {
+ LOG.warn("Policy for queue: {} does not exist.", queue);
+ return null;
+ }
+ return GetSubClusterPolicyConfigurationResponse
+ .newInstance(policy);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ FederationPolicyStoreInputValidator.validate(request);
+ SubClusterPolicyConfiguration policy =
+ request.getPolicyConfiguration();
+ try {
+ String queue = policy.getQueue();
+ putPolicy(queue, policy, true);
+ } catch (Exception e) {
+ String errMsg = "Cannot set policy: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ List result = new ArrayList<>();
+
+ String policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+ try {
+ for (String child : getChildren(policiesZNode)) {
+ SubClusterPolicyConfiguration policy = getPolicy(child);
+ result.add(policy);
+ }
+ } catch (Exception e) {
+ String errMsg = "Cannot get policies: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return null;
+ }
+
+ @Override
+ public Version loadVersion() {
+ return null;
+ }
+
+ /**
+ * Get the subcluster for an application.
+ * @param appId Application identifier.
+ * @return Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterId getApp(final ApplicationId appId) throws YarnException {
+ String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ SubClusterId subClusterId = null;
+ byte[] data = get(appZNode);
+ if (data != null) {
+ try {
+ subClusterId = new SubClusterIdPBImpl(
+ SubClusterIdProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse application at " + appZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return subClusterId;
+ }
+
+ /**
+ * Put an application.
+ * @param appId Application identifier.
+ * @param subClusterId Subcluster identifier.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putApp(final ApplicationId appId,
+ final SubClusterId subClusterId, boolean update)
+ throws YarnException {
+ String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ String appZNode = getNodePath(appsZNode, appId.toString());
+ SubClusterIdProto proto =
+ ((SubClusterIdPBImpl)subClusterId).getProto();
+ byte[] data = proto.toByteArray();
+ put(appZNode, data, update);
+ }
+
+ /**
+ * Get the current information for a subcluster from Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @return Subcluster information or null if it doesn't exist.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
+ throws YarnException {
+ String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+
+ SubClusterInfo policy = null;
+ byte[] data = get(memberZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterInfoPBImpl(
+ SubClusterInfoProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse subcluster info at " + memberZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param subclusterId Subcluster identifier.
+ * @param subClusterInfo Subcluster information.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ private void putSubclusterInfo(final SubClusterId subclusterId,
+ final SubClusterInfo subClusterInfo, final boolean update)
+ throws YarnException {
+ String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+ SubClusterInfoProto proto =
+ ((SubClusterInfoPBImpl)subClusterInfo).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get the queue policy from Zookeeper.
+ * @param queue Name of the queue.
+ * @return Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private SubClusterPolicyConfiguration getPolicy(final String queue)
+ throws YarnException {
+ String policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+ String policyZNode = getNodePath(policiesZNode, queue);
+
+ SubClusterPolicyConfiguration policy = null;
+ byte[] data = get(policyZNode);
+ if (data != null) {
+ try {
+ policy = new SubClusterPolicyConfigurationPBImpl(
+ SubClusterPolicyConfigurationProto.parseFrom(data));
+ } catch (InvalidProtocolBufferException e) {
+ String errMsg = "Cannot parse policy at " + policyZNode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Put the subcluster information in Zookeeper.
+ * @param queue Name of the queue.
+ * @param policy Subcluster policy configuration.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void putPolicy(final String queue,
+ final SubClusterPolicyConfiguration policy, boolean update)
+ throws YarnException {
+ String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+ String memberZNode = getNodePath(membershipZNode, queue);
+
+ SubClusterPolicyConfigurationProto proto =
+ ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
+ byte[] data = proto.toByteArray();
+ put(memberZNode, data, update);
+ }
+
+ /**
+ * Get data from a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @return Data in the znode.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private byte[] get(String znode) throws YarnException {
+ boolean exists = false;
+ try {
+ exists = exists(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot find znode " + znode;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!exists) {
+ LOG.error("{} does not exist", znode);
+ return null;
+ }
+
+ Stat stat = new Stat();
+ byte[] data = null;
+ try {
+ data = getData(znode, stat);
+ } catch (Exception e) {
+ String errMsg = "Cannot get data from znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return data;
+ }
+
+ /**
+ * Put data into a znode in Zookeeper.
+ * @param znode Path of the znode.
+ * @param data Data to write.
+ * @throws YarnException If it cannot contact ZooKeeper.
+ */
+ private void put(String znode, byte[] data, boolean update)
+ throws YarnException {
+ // Create the znode
+ boolean created = false;
+ try {
+ created = create(znode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (!created) {
+ LOG.debug("{} not created", znode);
+ if (!update) {
+ LOG.info("{} already existed and we are not updating", znode);
+ return;
+ }
+ }
+
+ // Write the data into the znode
+ try {
+ setData(znode, data, -1);
+ } catch (Exception e) {
+ String errMsg = "Cannot write data into znode " + znode
+ + ": " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ }
+
+ /**
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
+ */
+ private void createRootDirRecursively(final String path) throws Exception {
+ String pathParts[] = path.split("/");
+ Preconditions.checkArgument(
+ pathParts.length >= 1 && pathParts[0].isEmpty(),
+ "Invalid path: %s", path);
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ create(sb.toString());
+ }
+ }
+
+ /**
+ * Create a ZNode.
+ * @param path Path of the ZNode.
+ * @return If the ZNode was created.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private boolean create(final String path) throws Exception {
+ LOG.info("Creating {}", path);
+ boolean created = false;
+ if (!exists(path)) {
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(path, null);
+ created = true;
+ }
+ return created;
+ }
+
+ /**
+ * Delete a ZNode.
+ * @param path Path of the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ }
+
+ /**
+ * Get children of a ZNode.
+ * @param path Path of the ZNode.
+ * @return The list of children.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private List getChildren(final String path) throws Exception {
+ return curatorFramework.getChildren().forPath(path);
+ }
+
+ /**
+ * Check if a ZNode exists.
+ *
+ * @param path Path of the ZNode.
+ * @return If the ZNode exists.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private boolean exists(final String path) throws Exception {
+ return curatorFramework.checkExists().forPath(path) != null;
+ }
+
+ /**
+ * Get the data in a ZNode.
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private byte[] getData(final String path, Stat stat) throws Exception {
+ return curatorFramework.getData().storingStatIn(stat).forPath(path);
+ }
+
+ /**
+ * Set data into a ZNode.
+ * @param path Path of the ZNode.
+ * @param data Data to set.
+ * @param version Version of the data to store.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ private void setData(String path, byte[] data, int version) throws Exception {
+ curatorFramework.setData().withVersion(version).forPath(path, data);
+ }
+
+ /**
+ * Get the path for a ZNode.
+ * @param root Root of the ZNode.
+ * @param nodeName Name of the ZNode.
+ * @return Path for the ZNode.
+ */
+ private String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+
+ /**
+ * Get the current time.
+ * @return Current time in milliseconds.
+ */
+ private static long getCurrentTime() {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ return cal.getTimeInMillis();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..9617df8d44c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ZookeeperFederationStateStore.
+ */
+public class TestZookeeperFederationStateStore
+ extends FederationStateStoreBaseTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
+
+ /** Zookeeper test server. */
+ private static TestingServer curatorTestingServer;
+ private static CuratorFramework curatorFramework;
+
+ @Before
+ public void before() throws IOException, YarnException {
+ try {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ String connectString = curatorTestingServer.getConnectString();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, connectString);
+ setConf(conf);
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZooKeeper store", e);
+ throw new IOException(e);
+ }
+
+ super.before();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.after();
+
+ curatorFramework.close();
+ try {
+ curatorTestingServer.stop();
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FederationStateStore createStateStore() {
+ Configuration conf = new Configuration();
+ super.setConf(conf);
+ return new ZookeeperFederationStateStore();
+ }
+}
\ No newline at end of file