diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java new file mode 100644 index 00000000000..e05884499fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + + +/** + * ZooKeeper implementation of {@link FederationStateStore}. + */ +public class ZookeeperFederationStateStore implements FederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(ZookeeperFederationStateStore.class); + + /** Directory to store the state store data. */ + private String baseZNode; + + // ZooKeeper settings + private int zkSessionTimeout = 15000; + private int zkNumRetries = 1000; + private int zkRetryInterval = 1000; + private List zkAcl; + private List zkAuths; + + private CuratorFramework curatorFramework; + + private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships"; + private final static String ROOT_ZNODE_NAME_APPLICATION = "applications"; + private final static String ROOT_ZNODE_NAME_POLICY = "policies"; + + @Override + public void init(Configuration conf) throws YarnException { + LOG.info("Initializing ZooKeeper connection"); + + baseZNode = conf.get( + YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); + try { + curatorFramework = createAndStartCurator(conf); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + } + } + + /** + * Create and start the ZooKeeper Curator. + * + * @param conf Configuration for the curator. + * @return Curator framework. + * @throws IOException If the initialization failed. + */ + private CuratorFramework createAndStartCurator(Configuration conf) + throws IOException { + + String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHostPort == null) { + throw new RuntimeException( + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + + // Setup ZK auths + this.zkAuths = getZKAuths(conf); + List authInfos = new ArrayList(); + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); + } + + this.zkAcl = getZKAcls(conf); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkHostPort) + .sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(zkNumRetries, zkRetryInterval)) + .authorization(authInfos).build(); + client.start(); + return client; + } + + /** + * Get the ZK authorization from the configuration. + * + * @param conf Configuration. + * @return List of ZK authorizations. + * @throws IOException If the Zookeeper ACLs configuration file + * cannot be read + */ + private List getZKAuths(Configuration conf) + throws IOException { + try { + String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + if (zkAuthConf != null) { + return ZKUtil.parseAuth(zkAuthConf); + } else { + return Collections.emptyList(); + } + } catch (IOException | ZKUtil.BadAuthFormatException e) { + LOG.error("Couldn't read Auth based on " + + YarnConfiguration.RM_ZK_AUTH); + throw e; + } + } + + /** + * Get the ZK ACLs from the configuration. + * + * @param conf Configuration. + * @return List of ACLs. + * @throws IOException + */ + private static List getZKAcls(Configuration conf) throws IOException { + // Parse authentication from configuration. + String zkAclConf = conf.get( + YarnConfiguration.RM_ZK_ACL, + YarnConfiguration.DEFAULT_RM_ZK_ACL); + try { + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + return ZKUtil.parseACLs(zkAclConf); + } catch (IOException | ZKUtil.BadAclFormatException e) { + LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); + throw e; + } + } + + @Override + public void close() throws Exception { + if (curatorFramework != null) { + curatorFramework.close(); + } + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + + String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + String appZNode = getNodePath(appsZNode, appId.toString()); + + SubClusterId homeSubCluster = request.getApplicationHomeSubCluster().getHomeSubCluster(); + try { + if (!exists(appZNode)) { + setData(appZNode, homeSubCluster.toString().getBytes(), -1); + } + } catch (Exception e) { + LOG.error("Cannot add application home subcluster", e); + throw new YarnException("Cannot add application home subcluster", e); + } + + return AddApplicationHomeSubClusterResponse + .newInstance(homeSubCluster); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest registerSubClusterRequest) + throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest subClusterDeregisterRequest) + throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest subClusterHeartbeatRequest) + throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest subClusterRequest) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest subClustersRequest) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Version getCurrentVersion() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Version loadVersion() { + // TODO Auto-generated method stub + return null; + } + + + /** + * Utility function to ensure that the configured base znode exists. + * This recursively creates the znode as well as all of its parents. + */ + private void createRootDirRecursively(String path) throws Exception { + String pathParts[] = path.split("/"); + Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(), + "Invalid path: %s", path); + StringBuilder sb = new StringBuilder(); + + for (int i = 1; i < pathParts.length; i++) { + sb.append("/").append(pathParts[i]); + create(sb.toString()); + } + } + + /** + * Create a ZNode. + * + * @param path Path of the ZNode. + * @return If the ZNode was created. + * @throws Exception + */ + private boolean create(final String path) throws Exception { + boolean created = false; + if (!exists(path)) { + curatorFramework.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(path, null); + created = true; + } + return created; + } + + /** + * Delete a ZNode. + * + * @param path Path of the ZNode. + * @throws Exception + */ + private void delete(final String path) throws Exception { + if (exists(path)) { + curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); + } + } + + /** + * Get children of a ZNode. + * + * @param path Path of the ZNode. + * @return The list of children. + * @throws Exception + */ + private List getChildren(final String path) throws Exception { + return curatorFramework.getChildren().forPath(path); + } + + /** + * Check if a ZNode exists. + * + * @param path Path of the ZNode. + * @return If the ZNode exists. + * @throws Exception + */ + private boolean exists(final String path) throws Exception { + return curatorFramework.checkExists().forPath(path) != null; + } + + /** + * Get the data in a ZNode. + * + * @param path Path of the ZNode. + * @param stat Output statistics of the ZNode. + * @return The data in the ZNode. + * @throws Exception + */ + private String getData(final String path, Stat stat) throws Exception { + byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(path); + return new String(data); + } + + /** + * Set data into a ZNode. + * + * @param path Path of the ZNode. + * @param data Data to set. + * @param version Version of the data to store. + * @throws Exception + */ + private void setData(String path, byte[] data, int version) throws Exception { + curatorFramework.setData().withVersion(version).forPath(path, data); + } + + /** + * Get the path for a ZNode. + * + * @param root Root of the ZNode. + * @param nodeName Name of the ZNode. + * @return PAth for the ZNode. + */ + private String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } +}