diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 5f8509764e8..2418f3fd116 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -130,6 +130,15 @@ + + org.apache.curator + curator-framework + + + org.apache.curator + curator-test + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java new file mode 100644 index 00000000000..a2a8743fe4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -0,0 +1,807 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.List; +import java.util.TimeZone; + +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; + + +/** + * ZooKeeper implementation of {@link FederationStateStore}. + */ +public class ZookeeperFederationStateStore implements FederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(ZookeeperFederationStateStore.class); + + /** Directory to store the state store data. */ + private String baseZNode; + + // ZooKeeper settings + private int zkSessionTimeout = 15000; + private int zkNumRetries = 1000; + private int zkRetryInterval = 1000; + private List zkAcl; + private List zkAuths; + + private CuratorFramework curatorFramework; + + private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships"; + private final static String ROOT_ZNODE_NAME_APPLICATION = "applications"; + private final static String ROOT_ZNODE_NAME_POLICY = "policies"; + + @Override + public void init(Configuration conf) throws YarnException { + LOG.info("Initializing ZooKeeper connection"); + + baseZNode = conf.get( + YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); + try { + curatorFramework = createAndStartCurator(conf); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + } + + // Create base znode for each entity + try { + createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP)); + createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION)); + createRootDirRecursively(getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY)); + } catch (Exception e) { + String errMsg = "Cannot create base directories: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + + /** + * Create and start the ZooKeeper Curator. + * + * @param conf Configuration for the curator. + * @return Curator framework. + * @throws IOException If the initialization failed. + */ + private CuratorFramework createAndStartCurator(Configuration conf) + throws IOException { + + String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHostPort == null) { + throw new RuntimeException( + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + + // Setup ZK auths + this.zkAuths = getZKAuths(conf); + List authInfos = new ArrayList<>(); + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); + } + + this.zkAcl = getZKAcls(conf); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkHostPort) + .sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(zkNumRetries, zkRetryInterval)) + .authorization(authInfos).build(); + client.start(); + return client; + } + + /** + * Get the ZK authorization from the configuration. + * @param conf Configuration. + * @return List of ZK authorizations. + * @throws IOException If the Zookeeper ACLs configuration file + * cannot be read + */ + private List getZKAuths(Configuration conf) + throws IOException { + try { + String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + if (zkAuthConf != null) { + return ZKUtil.parseAuth(zkAuthConf); + } else { + return Collections.emptyList(); + } + } catch (IOException | ZKUtil.BadAuthFormatException e) { + LOG.error("Couldn't read Auth based on " + + YarnConfiguration.RM_ZK_AUTH); + throw e; + } + } + + /** + * Get the ZK ACLs from the configuration. + * @param conf Configuration. + * @return List of ACLs. + * @throws IOException + */ + private static List getZKAcls(Configuration conf) throws IOException { + // Parse authentication from configuration. + String zkAclConf = conf.get( + YarnConfiguration.RM_ZK_ACL, + YarnConfiguration.DEFAULT_RM_ZK_ACL); + try { + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + return ZKUtil.parseACLs(zkAclConf); + } catch (IOException | ZKUtil.BadAclFormatException e) { + LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); + throw e; + } + } + + @Override + public void close() throws Exception { + if (curatorFramework != null) { + curatorFramework.close(); + } + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); + ApplicationId appId = app.getApplicationId(); + + // Try to write the subcluster + SubClusterId homeSubCluster = app.getHomeSubCluster(); + try { + putApp(appId, homeSubCluster, false); + } catch (Exception e) { + String errMsg = "Cannot add application home subcluster for " + appId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + // Check for the actual subcluster + try { + homeSubCluster = getApp(appId); + } catch (Exception e) { + String errMsg = "Cannot check app home subcluster for " + appId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return AddApplicationHomeSubClusterResponse + .newInstance(homeSubCluster); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); + ApplicationId appId = app.getApplicationId(); + SubClusterId homeSubCluster = getApp(appId); + if (homeSubCluster == null) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + SubClusterId newSubClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + putApp(appId, newSubClusterId, true); + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationId(); + SubClusterId homeSubCluster = getApp(appId); + if (homeSubCluster == null) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetApplicationHomeSubClusterResponse.newInstance( + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster)); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + List result = new ArrayList<>(); + + String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + try { + for (String child : getChildren(appsZNode)) { + ApplicationId appId = ApplicationId.fromString(child); + SubClusterId homeSubCluster = getApp(appId); + result.add(ApplicationHomeSubCluster.newInstance(appId, homeSubCluster)); + } + } catch (Exception e) { + String errMsg = "Cannot get apps: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return GetApplicationsHomeSubClusterResponse.newInstance(result); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationId(); + String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + String appZNode = getNodePath(appsZNode, appId.toString()); + + boolean exists = false; + try { + exists = exists(appZNode); + } catch (Exception e) { + String errMsg = "Cannot check app: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!exists) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + try { + delete(appZNode); + } catch (Exception e) { + String errMsg = "Cannot delete app: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterInfo subClusterInfo = request.getSubClusterInfo(); + SubClusterId subclusterId = subClusterInfo.getSubClusterId(); + + // Update the heartbeat time + long currentTime = getCurrentTime(); + subClusterInfo.setLastHeartBeat(currentTime); + + try { + putSubclusterInfo(subclusterId, subClusterInfo, true); + } catch (Exception e) { + String errMsg = "Cannot register subcluster: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + SubClusterState state = request.getState(); + + // Get the current information and update it + SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + String errMsg = "SubCluster " + subClusterId + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } else { + subClusterInfo.setState(state); + putSubclusterInfo(subClusterId, subClusterInfo, true); + } + + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest request) throws YarnException { + + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + + SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + String errMsg = "SubCluster " + subClusterId + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + long currentTime = getCurrentTime(); + subClusterInfo.setLastHeartBeat(currentTime); + subClusterInfo.setState(request.getState()); + subClusterInfo.setCapability(request.getCapability()); + + putSubclusterInfo(subClusterId, subClusterInfo, true); + + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest request) throws YarnException { + + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + SubClusterInfo subClusterInfo = null; + try { + subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); + return null; + } + } catch (Exception e) { + String errMsg = "Cannot get subcluster: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClusterInfoResponse.newInstance(subClusterInfo); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest request) throws YarnException { + List result = new ArrayList<>(); + + String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP); + try { + for (String child : getChildren(membershipZNode)) { + SubClusterId subClusterId = SubClusterId.newInstance(child); + SubClusterInfo info = getSubclusterInfo(subClusterId); + if (!request.getFilterInactiveSubClusters() || + info.getState().isActive()) { + result.add(info); + } + } + } catch (Exception e) { + String errMsg = "Cannot get subclusters: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClustersInfoResponse.newInstance(result); + } + + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator.validate(request); + String queue = request.getQueue(); + SubClusterPolicyConfiguration policy = null;; + try { + policy = getPolicy(queue); + } catch (Exception e) { + String errMsg = "Cannot get policy: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + if (policy == null) { + LOG.warn("Policy for queue: {} does not exist.", queue); + return null; + } + return GetSubClusterPolicyConfigurationResponse + .newInstance(policy); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator.validate(request); + SubClusterPolicyConfiguration policy = + request.getPolicyConfiguration(); + try { + String queue = policy.getQueue(); + putPolicy(queue, policy, true); + } catch (Exception e) { + String errMsg = "Cannot set policy: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + List result = new ArrayList<>(); + + String policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); + try { + for (String child : getChildren(policiesZNode)) { + SubClusterPolicyConfiguration policy = getPolicy(child); + result.add(policy); + } + } catch (Exception e) { + String errMsg = "Cannot get policies: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); + } + + @Override + public Version getCurrentVersion() { + return null; + } + + @Override + public Version loadVersion() { + return null; + } + + /** + * Get the subcluster for an application. + * @param appId Application identifier. + * @return Subcluster identifier. + * @throws Exception If it cannot contact ZooKeeper. + */ + private SubClusterId getApp(final ApplicationId appId) throws YarnException { + String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + String appZNode = getNodePath(appsZNode, appId.toString()); + + SubClusterId subClusterId = null; + byte[] data = get(appZNode); + if (data != null) { + try { + subClusterId = new SubClusterIdPBImpl( + SubClusterIdProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse application at " + appZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return subClusterId; + } + + /** + * Put an application. + * @param appId Application identifier. + * @param subClusterId Subcluster identifier. + * @throws Exception If it cannot contact ZooKeeper. + */ + private void putApp(final ApplicationId appId, + final SubClusterId subClusterId, boolean update) + throws YarnException { + String appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + String appZNode = getNodePath(appsZNode, appId.toString()); + SubClusterIdProto proto = + ((SubClusterIdPBImpl)subClusterId).getProto(); + byte[] data = proto.toByteArray(); + put(appZNode, data, update); + } + + /** + * Get the current information for a subcluster from Zookeeper. + * @param subclusterId Subcluster identifier. + * @return Subcluster information or null if it doesn't exist. + * @throws Exception If it cannot contact ZooKeeper. + */ + private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId) + throws YarnException { + String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP); + String memberZNode = getNodePath(membershipZNode, subclusterId.toString()); + + SubClusterInfo policy = null; + byte[] data = get(memberZNode); + if (data != null) { + try { + policy = new SubClusterInfoPBImpl( + SubClusterInfoProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse subcluster info at " + memberZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return policy; + } + + /** + * Put the subcluster information in Zookeeper. + * @param subclusterId Subcluster identifier. + * @param subClusterInfo Subcluster information. + * @throws Exception If it cannot contact ZooKeeper. + */ + private void putSubclusterInfo(final SubClusterId subclusterId, + final SubClusterInfo subClusterInfo, final boolean update) + throws YarnException { + String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP); + String memberZNode = getNodePath(membershipZNode, subclusterId.toString()); + SubClusterInfoProto proto = + ((SubClusterInfoPBImpl)subClusterInfo).getProto(); + byte[] data = proto.toByteArray(); + put(memberZNode, data, update); + } + + /** + * Get the queue policy from Zookeeper. + * @param queue Name of the queue. + * @return Subcluster policy configuration. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private SubClusterPolicyConfiguration getPolicy(final String queue) + throws YarnException { + String policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); + String policyZNode = getNodePath(policiesZNode, queue); + + SubClusterPolicyConfiguration policy = null; + byte[] data = get(policyZNode); + if (data != null) { + try { + policy = new SubClusterPolicyConfigurationPBImpl( + SubClusterPolicyConfigurationProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse policy at " + policyZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return policy; + } + + /** + * Put the subcluster information in Zookeeper. + * @param queue Name of the queue. + * @param policy Subcluster policy configuration. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private void putPolicy(final String queue, + final SubClusterPolicyConfiguration policy, boolean update) + throws YarnException { + String membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); + String memberZNode = getNodePath(membershipZNode, queue); + + SubClusterPolicyConfigurationProto proto = + ((SubClusterPolicyConfigurationPBImpl)policy).getProto(); + byte[] data = proto.toByteArray(); + put(memberZNode, data, update); + } + + /** + * Get data from a znode in Zookeeper. + * @param znode Path of the znode. + * @return Data in the znode. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private byte[] get(String znode) throws YarnException { + boolean exists = false; + try { + exists = exists(znode); + } catch (Exception e) { + String errMsg = "Cannot find znode " + znode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!exists) { + LOG.error("{} does not exist", znode); + return null; + } + + Stat stat = new Stat(); + byte[] data = null; + try { + data = getData(znode, stat); + } catch (Exception e) { + String errMsg = "Cannot get data from znode " + znode + + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return data; + } + + /** + * Put data into a znode in Zookeeper. + * @param znode Path of the znode. + * @param data Data to write. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private void put(String znode, byte[] data, boolean update) + throws YarnException { + // Create the znode + boolean created = false; + try { + created = create(znode); + } catch (Exception e) { + String errMsg = "Cannot create znode " + znode + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!created) { + LOG.debug("{} not created", znode); + if (!update) { + LOG.info("{} already existed and we are not updating", znode); + return; + } + } + + // Write the data into the znode + try { + setData(znode, data, -1); + } catch (Exception e) { + String errMsg = "Cannot write data into znode " + znode + + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + + /** + * Utility function to ensure that the configured base znode exists. + * This recursively creates the znode as well as all of its parents. + */ + private void createRootDirRecursively(final String path) throws Exception { + String pathParts[] = path.split("/"); + Preconditions.checkArgument( + pathParts.length >= 1 && pathParts[0].isEmpty(), + "Invalid path: %s", path); + StringBuilder sb = new StringBuilder(); + + for (int i = 1; i < pathParts.length; i++) { + sb.append("/").append(pathParts[i]); + create(sb.toString()); + } + } + + /** + * Create a ZNode. + * @param path Path of the ZNode. + * @return If the ZNode was created. + * @throws Exception If it cannot contact Zookeeper. + */ + private boolean create(final String path) throws Exception { + LOG.info("Creating {}", path); + boolean created = false; + if (!exists(path)) { + curatorFramework.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(path, null); + created = true; + } + return created; + } + + /** + * Delete a ZNode. + * @param path Path of the ZNode. + * @throws Exception If it cannot contact Zookeeper. + */ + private void delete(final String path) throws Exception { + if (exists(path)) { + curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); + } + } + + /** + * Get children of a ZNode. + * @param path Path of the ZNode. + * @return The list of children. + * @throws Exception If it cannot contact Zookeeper. + */ + private List getChildren(final String path) throws Exception { + return curatorFramework.getChildren().forPath(path); + } + + /** + * Check if a ZNode exists. + * + * @param path Path of the ZNode. + * @return If the ZNode exists. + * @throws Exception If it cannot contact Zookeeper. + */ + private boolean exists(final String path) throws Exception { + return curatorFramework.checkExists().forPath(path) != null; + } + + /** + * Get the data in a ZNode. + * @param path Path of the ZNode. + * @param stat Output statistics of the ZNode. + * @return The data in the ZNode. + * @throws Exception If it cannot contact Zookeeper. + */ + private byte[] getData(final String path, Stat stat) throws Exception { + return curatorFramework.getData().storingStatIn(stat).forPath(path); + } + + /** + * Set data into a ZNode. + * @param path Path of the ZNode. + * @param data Data to set. + * @param version Version of the data to store. + * @throws Exception If it cannot contact Zookeeper. + */ + private void setData(String path, byte[] data, int version) throws Exception { + curatorFramework.setData().withVersion(version).forPath(path, data); + } + + /** + * Get the path for a ZNode. + * @param root Root of the ZNode. + * @param nodeName Name of the ZNode. + * @return Path for the ZNode. + */ + private String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } + + /** + * Get the current time. + * @return Current time in milliseconds. + */ + private static long getCurrentTime() { + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + return cal.getTimeInMillis(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java new file mode 100644 index 00000000000..9617df8d44c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for ZookeeperFederationStateStore. + */ +public class TestZookeeperFederationStateStore + extends FederationStateStoreBaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestZookeeperFederationStateStore.class); + + /** Zookeeper test server. */ + private static TestingServer curatorTestingServer; + private static CuratorFramework curatorFramework; + + @Before + public void before() throws IOException, YarnException { + try { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + String connectString = curatorTestingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, connectString); + setConf(conf); + } catch (Exception e) { + LOG.error("Cannot initialize ZooKeeper store", e); + throw new IOException(e); + } + + super.before(); + } + + @After + public void after() throws Exception { + super.after(); + + curatorFramework.close(); + try { + curatorTestingServer.stop(); + } catch (IOException e) { + } + } + + @Override + protected FederationStateStore createStateStore() { + Configuration conf = new Configuration(); + super.setConf(conf); + return new ZookeeperFederationStateStore(); + } +} \ No newline at end of file