diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
new file mode 100644
index 00000000000..e05884499fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ // ZooKeeper settings
+ private int zkSessionTimeout = 15000;
+ private int zkNumRetries = 1000;
+ private int zkRetryInterval = 1000;
+ private List zkAcl;
+ private List zkAuths;
+
+ private CuratorFramework curatorFramework;
+
+ private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+ private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+ private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ LOG.info("Initializing ZooKeeper connection");
+
+ baseZNode = conf.get(
+ YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
+ try {
+ curatorFramework = createAndStartCurator(conf);
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ }
+ }
+
+ /**
+ * Create and start the ZooKeeper Curator.
+ *
+ * @param conf Configuration for the curator.
+ * @return Curator framework.
+ * @throws IOException If the initialization failed.
+ */
+ private CuratorFramework createAndStartCurator(Configuration conf)
+ throws IOException {
+
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new RuntimeException(
+ YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
+ }
+
+ // Setup ZK auths
+ this.zkAuths = getZKAuths(conf);
+ List authInfos = new ArrayList();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+
+ this.zkAcl = getZKAcls(conf);
+
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkHostPort)
+ .sessionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(new RetryNTimes(zkNumRetries, zkRetryInterval))
+ .authorization(authInfos).build();
+ client.start();
+ return client;
+ }
+
+ /**
+ * Get the ZK authorization from the configuration.
+ *
+ * @param conf Configuration.
+ * @return List of ZK authorizations.
+ * @throws IOException If the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ private List getZKAuths(Configuration conf)
+ throws IOException {
+ try {
+ String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
+ zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+ if (zkAuthConf != null) {
+ return ZKUtil.parseAuth(zkAuthConf);
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException | ZKUtil.BadAuthFormatException e) {
+ LOG.error("Couldn't read Auth based on " +
+ YarnConfiguration.RM_ZK_AUTH);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the ZK ACLs from the configuration.
+ *
+ * @param conf Configuration.
+ * @return List of ACLs.
+ * @throws IOException
+ */
+ private static List getZKAcls(Configuration conf) throws IOException {
+ // Parse authentication from configuration.
+ String zkAclConf = conf.get(
+ YarnConfiguration.RM_ZK_ACL,
+ YarnConfiguration.DEFAULT_RM_ZK_ACL);
+ try {
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+ return ZKUtil.parseACLs(zkAclConf);
+ } catch (IOException | ZKUtil.BadAclFormatException e) {
+ LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (curatorFramework != null) {
+ curatorFramework.close();
+ }
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId =
+ request.getApplicationHomeSubCluster().getApplicationId();
+
+ String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ String appZNode = getNodePath(appsZNode, appId.toString());
+
+ SubClusterId homeSubCluster = request.getApplicationHomeSubCluster().getHomeSubCluster();
+ try {
+ if (!exists(appZNode)) {
+ setData(appZNode, homeSubCluster.toString().getBytes(), -1);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot add application home subcluster", e);
+ throw new YarnException("Cannot add application home subcluster", e);
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(homeSubCluster);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest registerSubClusterRequest)
+ throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest subClusterDeregisterRequest)
+ throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+ throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Version loadVersion() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ /**
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
+ */
+ private void createRootDirRecursively(String path) throws Exception {
+ String pathParts[] = path.split("/");
+ Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
+ "Invalid path: %s", path);
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ create(sb.toString());
+ }
+ }
+
+ /**
+ * Create a ZNode.
+ *
+ * @param path Path of the ZNode.
+ * @return If the ZNode was created.
+ * @throws Exception
+ */
+ private boolean create(final String path) throws Exception {
+ boolean created = false;
+ if (!exists(path)) {
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(path, null);
+ created = true;
+ }
+ return created;
+ }
+
+ /**
+ * Delete a ZNode.
+ *
+ * @param path Path of the ZNode.
+ * @throws Exception
+ */
+ private void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ }
+
+ /**
+ * Get children of a ZNode.
+ *
+ * @param path Path of the ZNode.
+ * @return The list of children.
+ * @throws Exception
+ */
+ private List getChildren(final String path) throws Exception {
+ return curatorFramework.getChildren().forPath(path);
+ }
+
+ /**
+ * Check if a ZNode exists.
+ *
+ * @param path Path of the ZNode.
+ * @return If the ZNode exists.
+ * @throws Exception
+ */
+ private boolean exists(final String path) throws Exception {
+ return curatorFramework.checkExists().forPath(path) != null;
+ }
+
+ /**
+ * Get the data in a ZNode.
+ *
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception
+ */
+ private String getData(final String path, Stat stat) throws Exception {
+ byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(path);
+ return new String(data);
+ }
+
+ /**
+ * Set data into a ZNode.
+ *
+ * @param path Path of the ZNode.
+ * @param data Data to set.
+ * @param version Version of the data to store.
+ * @throws Exception
+ */
+ private void setData(String path, byte[] data, int version) throws Exception {
+ curatorFramework.setData().withVersion(version).forPath(path, data);
+ }
+
+ /**
+ * Get the path for a ZNode.
+ *
+ * @param root Root of the ZNode.
+ * @param nodeName Name of the ZNode.
+ * @return PAth for the ZNode.
+ */
+ private String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+}