diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d608df899fd..922503c3994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2575,7 +2575,7 @@ public static boolean isAclEnabled(Configuration conf) { FEDERATION_PREFIX + "state-store.class"; public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS = - "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore"; + "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore"; public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; @@ -2614,6 +2614,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + public static final String FEDERATION_STATESTORE_ZK_PREFIX = + FEDERATION_PREFIX + "zk-state-store."; + /** Parent znode path under which ZKRMStateStore will create znodes */ + public static final String FEDERATION_STATESTORE_RM_PARENT_PATH = + FEDERATION_STATESTORE_ZK_PREFIX + "parent-path"; + public static final String DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH = + "/federationstore"; + private static final String FEDERATION_STATESTORE_SQL_PREFIX = FEDERATION_PREFIX + "state-store.sql."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 5f8509764e8..2418f3fd116 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -130,6 +130,15 @@ + + org.apache.curator + curator-framework + + + org.apache.curator + curator-test + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java new file mode 100644 index 00000000000..78bbead9fe6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import static org.apache.hadoop.yarn.server.utils.YarnZKUtils.getNodePath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.utils.YarnZKUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; + + +/** + * ZooKeeper implementation of {@link FederationStateStore}. + * + * The znode structure is as follows: + * ROOT_DIR_PATH + * |--- MEMBERSHIP + * | |----- SC1 + * | |----- SC2 + * |--- APPLICATION + * | |----- APP1 + * | |----- APP2 + * |--- POLICY + * |----- QUEUE1 + * |----- QUEUE1 + */ +public class ZookeeperFederationStateStore implements FederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(ZookeeperFederationStateStore.class); + + private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships"; + private final static String ROOT_ZNODE_NAME_APPLICATION = "applications"; + private final static String ROOT_ZNODE_NAME_POLICY = "policies"; + + /** Interface to Zookeeper. */ + private YarnZKUtils zkUtils; + + /** Directory to store the state store data. */ + private String baseZNode; + + private String appsZNode; + private String membershipZNode; + private String policiesZNode; + + @Override + public void init(Configuration conf) throws YarnException { + LOG.info("Initializing ZooKeeper connection"); + + baseZNode = conf.get( + YarnConfiguration.FEDERATION_STATESTORE_RM_PARENT_PATH, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH); + try { + this.zkUtils = new YarnZKUtils(conf); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + } + + // Base znodes + membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP); + appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION); + policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); + + // Create base znode for each entity + try { + createRootDirRecursively(membershipZNode); + createRootDirRecursively(appsZNode); + createRootDirRecursively(policiesZNode); + } catch (Exception e) { + String errMsg = "Cannot create base directories: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + } + + @Override + public void close() throws Exception { + if (zkUtils != null) { + zkUtils.close(); + } + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); + ApplicationId appId = app.getApplicationId(); + + // Try to write the subcluster + SubClusterId homeSubCluster = app.getHomeSubCluster(); + try { + putApp(appId, homeSubCluster, false); + } catch (Exception e) { + String errMsg = "Cannot add application home subcluster for " + appId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + // Check for the actual subcluster + try { + homeSubCluster = getApp(appId); + } catch (Exception e) { + String errMsg = "Cannot check app home subcluster for " + appId; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return AddApplicationHomeSubClusterResponse + .newInstance(homeSubCluster); + } + + @Override + public UpdateApplicationHomeSubClusterResponse + updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) + throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); + ApplicationId appId = app.getApplicationId(); + SubClusterId homeSubCluster = getApp(appId); + if (homeSubCluster == null) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + SubClusterId newSubClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + putApp(appId, newSubClusterId, true); + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationId(); + SubClusterId homeSubCluster = getApp(appId); + if (homeSubCluster == null) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetApplicationHomeSubClusterResponse.newInstance( + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster)); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + List result = new ArrayList<>(); + + try { + for (String child : zkUtils.getChildren(appsZNode)) { + ApplicationId appId = ApplicationId.fromString(child); + SubClusterId homeSubCluster = getApp(appId); + ApplicationHomeSubCluster app = + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); + result.add(app); + } + } catch (Exception e) { + String errMsg = "Cannot get apps: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return GetApplicationsHomeSubClusterResponse.newInstance(result); + } + + @Override + public DeleteApplicationHomeSubClusterResponse + deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) + throws YarnException { + + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationId(); + String appZNode = getNodePath(appsZNode, appId.toString()); + + boolean exists = false; + try { + exists = zkUtils.exists(appZNode); + } catch (Exception e) { + String errMsg = "Cannot check app: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!exists) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + try { + zkUtils.delete(appZNode); + } catch (Exception e) { + String errMsg = "Cannot delete app: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterInfo subClusterInfo = request.getSubClusterInfo(); + SubClusterId subclusterId = subClusterInfo.getSubClusterId(); + + // Update the heartbeat time + long currentTime = getCurrentTime(); + subClusterInfo.setLastHeartBeat(currentTime); + + try { + putSubclusterInfo(subclusterId, subClusterInfo, true); + } catch (Exception e) { + String errMsg = "Cannot register subcluster: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest request) throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + SubClusterState state = request.getState(); + + // Get the current information and update it + SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + String errMsg = "SubCluster " + subClusterId + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } else { + subClusterInfo.setState(state); + putSubclusterInfo(subClusterId, subClusterInfo, true); + } + + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest request) throws YarnException { + + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + + SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + String errMsg = "SubCluster " + subClusterId + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + long currentTime = getCurrentTime(); + subClusterInfo.setLastHeartBeat(currentTime); + subClusterInfo.setState(request.getState()); + subClusterInfo.setCapability(request.getCapability()); + + putSubclusterInfo(subClusterId, subClusterInfo, true); + + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest request) throws YarnException { + + FederationMembershipStateStoreInputValidator.validate(request); + SubClusterId subClusterId = request.getSubClusterId(); + SubClusterInfo subClusterInfo = null; + try { + subClusterInfo = getSubclusterInfo(subClusterId); + if (subClusterInfo == null) { + LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); + return null; + } + } catch (Exception e) { + String errMsg = "Cannot get subcluster: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClusterInfoResponse.newInstance(subClusterInfo); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest request) throws YarnException { + List result = new ArrayList<>(); + + try { + for (String child : zkUtils.getChildren(membershipZNode)) { + SubClusterId subClusterId = SubClusterId.newInstance(child); + SubClusterInfo info = getSubclusterInfo(subClusterId); + if (!request.getFilterInactiveSubClusters() || + info.getState().isActive()) { + result.add(info); + } + } + } catch (Exception e) { + String errMsg = "Cannot get subclusters: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClustersInfoResponse.newInstance(result); + } + + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator.validate(request); + String queue = request.getQueue(); + SubClusterPolicyConfiguration policy = null; + try { + policy = getPolicy(queue); + } catch (Exception e) { + String errMsg = "Cannot get policy: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + if (policy == null) { + LOG.warn("Policy for queue: {} does not exist.", queue); + return null; + } + return GetSubClusterPolicyConfigurationResponse + .newInstance(policy); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + + FederationPolicyStoreInputValidator.validate(request); + SubClusterPolicyConfiguration policy = + request.getPolicyConfiguration(); + try { + String queue = policy.getQueue(); + putPolicy(queue, policy, true); + } catch (Exception e) { + String errMsg = "Cannot set policy: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + List result = new ArrayList<>(); + + try { + for (String child : zkUtils.getChildren(policiesZNode)) { + SubClusterPolicyConfiguration policy = getPolicy(child); + result.add(policy); + } + } catch (Exception e) { + String errMsg = "Cannot get policies: " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); + } + + @Override + public Version getCurrentVersion() { + return null; + } + + @Override + public Version loadVersion() { + return null; + } + + /** + * Get the subcluster for an application. + * @param appId Application identifier. + * @return Subcluster identifier. + * @throws Exception If it cannot contact ZooKeeper. + */ + private SubClusterId getApp(final ApplicationId appId) throws YarnException { + String appZNode = getNodePath(appsZNode, appId.toString()); + + SubClusterId subClusterId = null; + byte[] data = get(appZNode); + if (data != null) { + try { + subClusterId = new SubClusterIdPBImpl( + SubClusterIdProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse application at " + appZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return subClusterId; + } + + /** + * Put an application. + * @param appId Application identifier. + * @param subClusterId Subcluster identifier. + * @throws Exception If it cannot contact ZooKeeper. + */ + private void putApp(final ApplicationId appId, + final SubClusterId subClusterId, boolean update) + throws YarnException { + String appZNode = getNodePath(appsZNode, appId.toString()); + SubClusterIdProto proto = + ((SubClusterIdPBImpl)subClusterId).getProto(); + byte[] data = proto.toByteArray(); + put(appZNode, data, update); + } + + /** + * Get the current information for a subcluster from Zookeeper. + * @param subclusterId Subcluster identifier. + * @return Subcluster information or null if it doesn't exist. + * @throws Exception If it cannot contact ZooKeeper. + */ + private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId) + throws YarnException { + String memberZNode = getNodePath(membershipZNode, subclusterId.toString()); + + SubClusterInfo policy = null; + byte[] data = get(memberZNode); + if (data != null) { + try { + policy = new SubClusterInfoPBImpl( + SubClusterInfoProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse subcluster info at " + memberZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return policy; + } + + /** + * Put the subcluster information in Zookeeper. + * @param subclusterId Subcluster identifier. + * @param subClusterInfo Subcluster information. + * @throws Exception If it cannot contact ZooKeeper. + */ + private void putSubclusterInfo(final SubClusterId subclusterId, + final SubClusterInfo subClusterInfo, final boolean update) + throws YarnException { + String memberZNode = getNodePath(membershipZNode, subclusterId.toString()); + SubClusterInfoProto proto = + ((SubClusterInfoPBImpl)subClusterInfo).getProto(); + byte[] data = proto.toByteArray(); + put(memberZNode, data, update); + } + + /** + * Get the queue policy from Zookeeper. + * @param queue Name of the queue. + * @return Subcluster policy configuration. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private SubClusterPolicyConfiguration getPolicy(final String queue) + throws YarnException { + String policyZNode = getNodePath(policiesZNode, queue); + + SubClusterPolicyConfiguration policy = null; + byte[] data = get(policyZNode); + if (data != null) { + try { + policy = new SubClusterPolicyConfigurationPBImpl( + SubClusterPolicyConfigurationProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse policy at " + policyZNode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + return policy; + } + + /** + * Put the subcluster information in Zookeeper. + * @param queue Name of the queue. + * @param policy Subcluster policy configuration. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private void putPolicy(final String queue, + final SubClusterPolicyConfiguration policy, boolean update) + throws YarnException { + String memberZNode = getNodePath(membershipZNode, queue); + + SubClusterPolicyConfigurationProto proto = + ((SubClusterPolicyConfigurationPBImpl)policy).getProto(); + byte[] data = proto.toByteArray(); + put(memberZNode, data, update); + } + + /** + * Get data from a znode in Zookeeper. + * @param znode Path of the znode. + * @return Data in the znode. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private byte[] get(String znode) throws YarnException { + boolean exists = false; + try { + exists = zkUtils.exists(znode); + } catch (Exception e) { + String errMsg = "Cannot find znode " + znode; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!exists) { + LOG.error("{} does not exist", znode); + return null; + } + + byte[] data = null; + try { + data = zkUtils.getData(znode); + } catch (Exception e) { + String errMsg = "Cannot get data from znode " + znode + + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + return data; + } + + /** + * Put data into a znode in Zookeeper. + * @param znode Path of the znode. + * @param data Data to write. + * @throws YarnException If it cannot contact ZooKeeper. + */ + private void put(String znode, byte[] data, boolean update) + throws YarnException { + // Create the znode + boolean created = false; + try { + created = zkUtils.create(znode); + } catch (Exception e) { + String errMsg = "Cannot create znode " + znode + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + if (!created) { + LOG.debug("{} not created", znode); + if (!update) { + LOG.info("{} already existed and we are not updating", znode); + return; + } + } + + // Write the data into the znode + try { + zkUtils.setData(znode, data, -1); + } catch (Exception e) { + String errMsg = "Cannot write data into znode " + znode + + ": " + e.getMessage(); + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + } + + /** + * Utility function to ensure that the configured base znode exists. + * This recursively creates the znode as well as all of its parents. + */ + private void createRootDirRecursively(final String path) throws Exception { + String[] pathParts = path.split("/"); + Preconditions.checkArgument( + pathParts.length >= 1 && pathParts[0].isEmpty(), + "Invalid path: %s", path); + StringBuilder sb = new StringBuilder(); + + for (int i = 1; i < pathParts.length; i++) { + sb.append("/").append(pathParts[i]); + zkUtils.create(sb.toString()); + } + } + + /** + * Get the current time. + * @return Current time in milliseconds. + */ + private static long getCurrentTime() { + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + return cal.getTimeInMillis(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnZKUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnZKUtils.java new file mode 100644 index 00000000000..4d1809930dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnZKUtils.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that provides utility methods specific to ZK operations. + */ +@InterfaceAudience.Private +public final class YarnZKUtils { + + private static final Logger LOG = LoggerFactory.getLogger(YarnZKUtils.class); + + + private final String zkRootNodePassword = + Long.toString(new SecureRandom().nextLong()); + + /** ACL and auth info. */ + private List zkAcl; + + @VisibleForTesting + protected CuratorFramework curatorFramework; + + public YarnZKUtils(Configuration conf) throws IOException { + this.curatorFramework = createAndStartCurator(conf); + } + + public String getZkRootNodePassword() { + return zkRootNodePassword; + } + + public CuratorFramework getCurator() { + return curatorFramework; + } + + public void close() { + if (curatorFramework != null) { + curatorFramework.close(); + } + } + + /** + * Utility method to fetch the ZK ACLs from the configuration. + * + * @throws java.io.IOException if the Zookeeper ACLs configuration file + * cannot be read + */ + public static List getZKAcls(Configuration conf) throws IOException { + // Parse authentication from configuration. + String zkAclConf = + conf.get(YarnConfiguration.RM_ZK_ACL, + YarnConfiguration.DEFAULT_RM_ZK_ACL); + try { + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + return ZKUtil.parseACLs(zkAclConf); + } catch (IOException | ZKUtil.BadAclFormatException e) { + LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); + throw e; + } + } + + /** + * Utility method to fetch ZK auth info from the configuration. + * + * @throws java.io.IOException if the Zookeeper ACLs configuration file + * cannot be read + */ + public static List getZKAuths(Configuration conf) + throws IOException { + String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); + try { + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + if (zkAuthConf != null) { + return ZKUtil.parseAuth(zkAuthConf); + } else { + return Collections.emptyList(); + } + } catch (IOException | ZKUtil.BadAuthFormatException e) { + LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); + throw e; + } + } + + public CuratorFramework createAndStartCurator(Configuration conf) + throws IOException { + String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHostPort == null) { + throw new YarnRuntimeException( + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, + YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); + int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); + + // set up zk auths + List zkAuths = getZKAuths(conf); + List authInfos = new ArrayList<>(); + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); + } + + if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance( + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) { + String zkRootNodeUsername = HAUtil + .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, conf); + byte[] defaultFencingAuth = + (zkRootNodeUsername + ":" + zkRootNodePassword) + .getBytes(Charset.forName("UTF-8")); + authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(), + defaultFencingAuth)); + } + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkHostPort) + .sessionTimeoutMs(zkSessionTimeout) + .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval)) + .authorization(authInfos).build(); + client.start(); + return client; + } + + /** + * Get ACLs for a Zn. + * @param path Path of the ZNode. + * @return + * @throws Exception + */ + public List getACL(final String path) throws Exception { + return curatorFramework.getACL().forPath(path); + } + + /** + * Get the data in a ZNode. + * @param path Path of the ZNode. + * @param stat Output statistics of the ZNode. + * @return The data in the ZNode. + * @throws Exception If it cannot contact Zookeeper. + */ + public byte[] getData(final String path) throws Exception { + return curatorFramework.getData().forPath(path); + } + + /** + * Set data into a ZNode. + * @param path Path of the ZNode. + * @param data Data to set. + * @param version Version of the data to store. + * @throws Exception If it cannot contact Zookeeper. + */ + public void setData(String path, byte[] data, int version) throws Exception { + curatorFramework.setData().withVersion(version).forPath(path, data); + } + + /** + * Get children of a ZNode. + * @param path Path of the ZNode. + * @return The list of children. + * @throws Exception If it cannot contact Zookeeper. + */ + public List getChildren(final String path) throws Exception { + return curatorFramework.getChildren().forPath(path); + } + + /** + * Check if a ZNode exists. + * + * @param path Path of the ZNode. + * @return If the ZNode exists. + * @throws Exception If it cannot contact Zookeeper. + */ + public boolean exists(final String path) throws Exception { + return curatorFramework.checkExists().forPath(path) != null; + } + + /** + * Create a ZNode. + * @param path Path of the ZNode. + * @return If the ZNode was created. + * @throws Exception If it cannot contact Zookeeper. + */ + public boolean create(final String path) throws Exception { + boolean created = false; + if (!exists(path)) { + curatorFramework.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(path, null); + created = true; + } + return created; + } + + /** + * Delete a ZNode. + * @param path Path of the ZNode. + * @throws Exception If it cannot contact Zookeeper. + */ + public void delete(final String path) throws Exception { + if (exists(path)) { + curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); + } + } + + /** + * Get the path for a ZNode. + * @param root Root of the ZNode. + * @param nodeName Name of the ZNode. + * @return Path for the ZNode. + */ + public static String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java new file mode 100644 index 00000000000..9617df8d44c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for ZookeeperFederationStateStore. + */ +public class TestZookeeperFederationStateStore + extends FederationStateStoreBaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestZookeeperFederationStateStore.class); + + /** Zookeeper test server. */ + private static TestingServer curatorTestingServer; + private static CuratorFramework curatorFramework; + + @Before + public void before() throws IOException, YarnException { + try { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + String connectString = curatorTestingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, connectString); + setConf(conf); + } catch (Exception e) { + LOG.error("Cannot initialize ZooKeeper store", e); + throw new IOException(e); + } + + super.before(); + } + + @After + public void after() throws Exception { + super.after(); + + curatorFramework.close(); + try { + curatorTestingServer.stop(); + } catch (IOException e) { + } + } + + @Override + protected FederationStateStore createStateStore() { + Configuration conf = new Configuration(); + super.setConf(conf); + return new ZookeeperFederationStateStore(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index a8dcda4f797..bfd2a021712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.utils.YarnZKUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; @@ -96,8 +97,8 @@ protected void serviceInit(Configuration conf) zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); - List zkAcls = RMZKUtils.getZKAcls(conf); - List zkAuths = RMZKUtils.getZKAuths(conf); + List zkAcls = YarnZKUtils.getZKAcls(conf); + List zkAuths = YarnZKUtils.getZKAuths(conf); int maxRetryNum = conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java deleted file mode 100644 index 4b8561dae15..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ZKUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.data.ACL; - -import java.util.Collections; -import java.util.List; - -/** - * Helper class that provides utility methods specific to ZK operations - */ -@InterfaceAudience.Private -public class RMZKUtils { - private static final Log LOG = LogFactory.getLog(RMZKUtils.class); - - /** - * Utility method to fetch the ZK ACLs from the configuration. - * - * @throws java.io.IOException if the Zookeeper ACLs configuration file - * cannot be read - */ - public static List getZKAcls(Configuration conf) throws IOException { - // Parse authentication from configuration. - String zkAclConf = - conf.get(YarnConfiguration.RM_ZK_ACL, - YarnConfiguration.DEFAULT_RM_ZK_ACL); - try { - zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); - return ZKUtil.parseACLs(zkAclConf); - } catch (IOException | ZKUtil.BadAclFormatException e) { - LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); - throw e; - } - } - - /** - * Utility method to fetch ZK auth info from the configuration. - * - * @throws java.io.IOException if the Zookeeper ACLs configuration file - * cannot be read - */ - public static List getZKAuths(Configuration conf) - throws IOException { - String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); - try { - zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); - if (zkAuthConf != null) { - return ZKUtil.parseAuth(zkAuthConf); - } else { - return Collections.emptyList(); - } - } catch (IOException | ZKUtil.BadAuthFormatException e) { - LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); - throw e; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 969188529e8..d46b638e50a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.utils.YarnZKUtils; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -192,9 +193,7 @@ protected ResourceTrackerService resourceTracker; private JvmMetrics jvmMetrics; private boolean curatorEnabled = false; - private CuratorFramework curator; - private final String zkRootNodePassword = - Long.toString(new SecureRandom().nextLong()); + private YarnZKUtils zkUtils; private boolean recoveryEnabled; @VisibleForTesting @@ -345,7 +344,7 @@ protected EmbeddedElector createEmbeddedElector() throws IOException { conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { - this.curator = createAndStartCurator(conf); + this.zkUtils = new YarnZKUtils(conf); elector = new CuratorBasedElectorService(this); } else { elector = new ActiveStandbyElectorBasedElectorService(this); @@ -353,57 +352,14 @@ protected EmbeddedElector createEmbeddedElector() throws IOException { return elector; } - public CuratorFramework createAndStartCurator(Configuration conf) - throws IOException { - String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); - if (zkHostPort == null) { - throw new YarnRuntimeException( - YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); - } - int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, - YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); - int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, - YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); - int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); - - // set up zk auths - List zkAuths = RMZKUtils.getZKAuths(conf); - List authInfos = new ArrayList<>(); - for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { - authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth())); - } - - if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance( - YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) { - String zkRootNodeUsername = HAUtil - .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, conf); - byte[] defaultFencingAuth = - (zkRootNodeUsername + ":" + zkRootNodePassword) - .getBytes(Charset.forName("UTF-8")); - authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(), - defaultFencingAuth)); - } - - CuratorFramework client = CuratorFrameworkFactory.builder() - .connectString(zkHostPort) - .sessionTimeoutMs(zkSessionTimeout) - .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval)) - .authorization(authInfos).build(); - client.start(); - return client; - } - public CuratorFramework getCurator() { - return this.curator; + return this.zkUtils.getCurator(); } - public String getZkRootNodePassword() { - return this.zkRootNodePassword; + public YarnZKUtils getZKUtils() { + return this.zkUtils; } - protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, Configuration conf) { return new QueueACLsManager(scheduler, conf); @@ -1264,8 +1220,8 @@ protected void serviceStop() throws Exception { configurationProvider.close(); } super.serviceStop(); - if (curator != null) { - curator.close(); + if (zkUtils != null) { + zkUtils.close(); } transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1b3b367bb6c..a68f462fc33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import static org.apache.hadoop.yarn.server.utils.YarnZKUtils.getNodePath; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; @@ -46,7 +48,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; +import org.apache.hadoop.yarn.server.utils.YarnZKUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -202,7 +204,7 @@ new DigestAuthenticationProvider().getScheme(); @VisibleForTesting - protected CuratorFramework curatorFramework; + protected YarnZKUtils zkUtils; /* * Indicates different app attempt state store operations. @@ -258,7 +260,7 @@ YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":" - + resourceManager.getZkRootNodePassword())); + + zkUtils.getZkRootNodePassword())); zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId)); return zkRootNodeAclList; @@ -298,7 +300,7 @@ public synchronized void initInternal(Configuration conf) appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX; } - zkAcl = RMZKUtils.getZKAcls(conf); + zkAcl = YarnZKUtils.getZKAcls(conf); if (HAUtil.isHAEnabled(conf)) { String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance @@ -330,10 +332,10 @@ public synchronized void initInternal(Configuration conf) amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); - curatorFramework = resourceManager.getCurator(); + zkUtils = resourceManager.getZKUtils(); - if (curatorFramework == null) { - curatorFramework = resourceManager.createAndStartCurator(conf); + if (zkUtils == null) { + zkUtils = new YarnZKUtils(conf); } } @@ -341,30 +343,30 @@ public synchronized void initInternal(Configuration conf) public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDirRecursively(znodeWorkingPath); - create(zkRootNodePath); + zkUtils.create(zkRootNodePath); setRootNodeAcls(); - delete(fencingNodePath); + zkUtils.delete(fencingNodePath); if (HAUtil.isHAEnabled(getConfig()) && !HAUtil .isAutomaticFailoverEnabled(getConfig())) { verifyActiveStatusThread = new VerifyActiveStatusThread(); verifyActiveStatusThread.start(); } - create(rmAppRoot); - create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES)); + zkUtils.create(rmAppRoot); + zkUtils.create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES)); for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { - create(rmAppRootHierarchies.get(splitIndex)); + zkUtils.create(rmAppRootHierarchies.get(splitIndex)); } - create(rmDTSecretManagerRoot); - create(dtMasterKeysRootPath); - create(delegationTokensRootPath); - create(dtSequenceNumberPath); - create(amrmTokenSecretManagerRoot); - create(reservationRoot); + zkUtils.create(rmDTSecretManagerRoot); + zkUtils.create(dtMasterKeysRootPath); + zkUtils.create(delegationTokensRootPath); + zkUtils.create(dtSequenceNumberPath); + zkUtils.create(amrmTokenSecretManagerRoot); + zkUtils.create(reservationRoot); } private void logRootNodeAcls(String prefix) throws Exception { Stat getStat = new Stat(); - List getAcls = getACL(zkRootNodePath); + List getAcls = zkUtils.getACL(zkRootNodePath); StringBuilder builder = new StringBuilder(); builder.append(prefix); @@ -382,6 +384,7 @@ private void setRootNodeAcls() throws Exception { logRootNodeAcls("Before setting ACLs'\n"); } + CuratorFramework curatorFramework = zkUtils.getCurator(); if (HAUtil.isHAEnabled(getConfig())) { curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath); } else { @@ -401,7 +404,7 @@ protected synchronized void closeInternal() throws Exception { } if (!HAUtil.isHAEnabled(getConfig())) { - IOUtils.closeStream(curatorFramework); + IOUtils.closeStream(zkUtils.getCurator()); } } @@ -416,7 +419,7 @@ protected synchronized void storeVersion() throws Exception { byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (exists(versionNodePath)) { + if (zkUtils.exists(versionNodePath)) { safeSetData(versionNodePath, data, -1); } else { safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); @@ -427,8 +430,8 @@ protected synchronized void storeVersion() throws Exception { protected synchronized Version loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); - if (exists(versionNodePath)) { - byte[] data = getData(versionNodePath); + if (zkUtils.exists(versionNodePath)) { + byte[] data = zkUtils.getData(versionNodePath); return new VersionPBImpl(VersionProto.parseFrom(data)); } @@ -440,9 +443,9 @@ public synchronized long getAndIncrementEpoch() throws Exception { String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); long currentEpoch = baseEpoch; - if (exists(epochNodePath)) { + if (zkUtils.exists(epochNodePath)) { // load current epoch - byte[] data = getData(epochNodePath); + byte[] data = zkUtils.getData(epochNodePath); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it @@ -475,7 +478,7 @@ public synchronized RMState loadState() throws Exception { } private void loadReservationSystemState(RMState rmState) throws Exception { - List planNodes = getChildren(reservationRoot); + List planNodes = zkUtils.getChildren(reservationRoot); for (String planName : planNodes) { if (LOG.isDebugEnabled()) { @@ -483,7 +486,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception { } String planNodePath = getNodePath(reservationRoot, planName); - List reservationNodes = getChildren(planNodePath); + List reservationNodes = zkUtils.getChildren(planNodePath); for (String reservationNodeName : reservationNodes) { String reservationNodePath = @@ -493,7 +496,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception { LOG.debug("Loading reservation from znode: " + reservationNodePath); } - byte[] reservationData = getData(reservationNodePath); + byte[] reservationData = zkUtils.getData(reservationNodePath); ReservationAllocationStateProto allocationState = ReservationAllocationStateProto.parseFrom(reservationData); @@ -511,7 +514,7 @@ private void loadReservationSystemState(RMState rmState) throws Exception { private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { - byte[] data = getData(amrmTokenSecretManagerRoot); + byte[] data = zkUtils.getData(amrmTokenSecretManagerRoot); if (data == null) { LOG.warn("There is no data saved"); @@ -533,11 +536,11 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) } private void loadRMDelegationKeyState(RMState rmState) throws Exception { - List childNodes = getChildren(dtMasterKeysRootPath); + List childNodes = zkUtils.getChildren(dtMasterKeysRootPath); for (String childNodeName : childNodes) { String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); - byte[] childData = getData(childNodePath); + byte[] childData = zkUtils.getData(childNodePath); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -562,7 +565,7 @@ private void loadRMDelegationKeyState(RMState rmState) throws Exception { } private void loadRMSequentialNumberState(RMState rmState) throws Exception { - byte[] seqData = getData(dtSequenceNumberPath); + byte[] seqData = zkUtils.getData(dtSequenceNumberPath); if (seqData != null) { ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); @@ -575,12 +578,12 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception { private void loadRMDelegationTokenState(RMState rmState) throws Exception { List childNodes = - getChildren(delegationTokensRootPath); + zkUtils.getChildren(delegationTokensRootPath); for (String childNodeName : childNodes) { String childNodePath = getNodePath(delegationTokensRootPath, childNodeName); - byte[] childData = getData(childNodePath); + byte[] childData = zkUtils.getData(childNodePath); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -611,7 +614,7 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception { private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath, String appIdStr) throws Exception { - byte[] appData = getData(appNodePath); + byte[] appData = zkUtils.getData(appNodePath); if (LOG.isDebugEnabled()) { LOG.debug("Loading application from znode: " + appNodePath); } @@ -633,7 +636,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { if (appRoot == null) { continue; } - List childNodes = getChildren(appRoot); + List childNodes = zkUtils.getChildren(appRoot); boolean appNodeFound = false; for (String childNodeName : childNodes) { if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { @@ -644,7 +647,7 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } else { // If AppId Node is partitioned. String parentNodePath = getNodePath(appRoot, childNodeName); - List leafNodes = getChildren(parentNodePath); + List leafNodes = zkUtils.getChildren(parentNodePath); for (String leafNodeName : leafNodes) { String appIdStr = childNodeName + leafNodeName; loadRMAppStateFromAppNode(rmState, @@ -667,12 +670,12 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { private void loadApplicationAttemptState(ApplicationStateData appState, String appPath) throws Exception { - List attempts = getChildren(appPath); + List attempts = zkUtils.getChildren(appPath); for (String attemptIDStr : attempts) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { String attemptPath = getNodePath(appPath, attemptIDStr); - byte[] attemptData = getData(attemptPath); + byte[] attemptData = zkUtils.getData(attemptPath); ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( @@ -709,7 +712,7 @@ private void checkRemoveParentAppNode(String appIdPath, int splitIndex) String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex); List children = null; try { - children = getChildren(parentAppNode); + children = zkUtils.getChildren(parentAppNode); } catch (KeeperException.NoNodeException ke) { // It should be fine to swallow this exception as the parent app node we // intend to delete is already deleted. @@ -770,7 +773,7 @@ protected synchronized void updateApplicationStateInternal( boolean pathExists = true; // Look for paths based on other split indices if path as per split index // does not exist. - if (!exists(nodeUpdatePath)) { + if (!zkUtils.exists(nodeUpdatePath)) { AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString()); if (alternatePathInfo != null) { nodeUpdatePath = alternatePathInfo.path; @@ -780,7 +783,7 @@ protected synchronized void updateApplicationStateInternal( if (appIdNodeSplitIndex != 0) { String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); - if (!exists(rootNode)) { + if (!zkUtils.exists(rootNode)) { safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); } } @@ -816,7 +819,7 @@ private void handleApplicationAttemptStateOp( String appId = appAttemptId.getApplicationId().toString(); String appDirPath = getLeafAppIdNodePath(appId, false); // Look for paths based on other split indices. - if (!exists(appDirPath)) { + if (!zkUtils.exists(appDirPath)) { AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); if (alternatePathInfo == null) { if (operation == AppAttemptOp.REMOVE) { @@ -839,7 +842,7 @@ private void handleApplicationAttemptStateOp( } switch (operation) { case UPDATE: - if (exists(path)) { + if (zkUtils.exists(path)) { safeSetData(path, attemptStateData, -1); } else { safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); @@ -912,7 +915,7 @@ private void removeApp(String removeAppId, boolean safeRemove, int splitIndex = appIdNodeSplitIndex; // Look for paths based on other split indices if path as per configured // split index does not exist. - if (!exists(appIdRemovePath)) { + if (!zkUtils.exists(appIdRemovePath)) { AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId); if (alternatePathInfo != null) { appIdRemovePath = alternatePathInfo.path; @@ -936,7 +939,7 @@ private void removeApp(String removeAppId, boolean safeRemove, } safeDelete(appIdRemovePath); } else { - curatorFramework.delete().deletingChildrenIfNeeded(). + zkUtils.getCurator().delete().deletingChildrenIfNeeded(). forPath(appIdRemovePath); } // Check if we should remove the parent app node as well. @@ -976,7 +979,7 @@ protected synchronized void updateRMDelegationTokenState( getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - if (exists(nodeRemovePath)) { + if (zkUtils.exists(nodeRemovePath)) { // in case znode exists addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true); } else { @@ -1056,7 +1059,7 @@ protected synchronized void removeRMDTMasterKeyState( @Override public synchronized void deleteStore() throws Exception { - delete(zkRootNodePath); + zkUtils.delete(zkRootNodePath); } @Override @@ -1065,11 +1068,6 @@ public synchronized void removeApplication(ApplicationId removeAppId) removeApp(removeAppId.toString()); } - @VisibleForTesting - String getNodePath(String root, String nodeName) { - return (root + "/" + nodeName); - } - @Override protected synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) @@ -1094,7 +1092,7 @@ protected synchronized void removeReservationState(String planName, safeDelete(reservationPath); - List reservationNodes = getChildren(planNodePath); + List reservationNodes = zkUtils.getChildren(planNodePath); if (reservationNodes.isEmpty()) { safeDelete(planNodePath); @@ -1121,7 +1119,7 @@ private void addOrUpdateReservationState( reservationIdName); byte[] reservationData = reservationAllocation.toByteArray(); - if (!exists(planCreatePath)) { + if (!zkUtils.exists(planCreatePath)) { if (LOG.isDebugEnabled()) { LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath); } @@ -1157,7 +1155,7 @@ private void createRootDirRecursively(String path) throws Exception { for (int i = 1; i < pathParts.length; i++) { sb.append("/").append(pathParts[i]); - create(sb.toString()); + zkUtils.create(sb.toString()); } } @@ -1176,7 +1174,7 @@ private AppNodeSplitInfo getAlternatePath(String appId) throws Exception { if (splitIndex != appIdNodeSplitIndex) { String alternatePath = getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false); - if (exists(alternatePath)) { + if (zkUtils.exists(alternatePath)) { return new AppNodeSplitInfo(alternatePath, splitIndex); } } @@ -1205,7 +1203,7 @@ private String getLeafAppIdNodePath(String appId, String rootNode, int splitIdx = nodeName.length() - appIdNodeSplitIdx; String rootNodePath = getNodePath(rootNode, nodeName.substring(0, splitIdx)); - if (createParentIfNotExists && !exists(rootNodePath)) { + if (createParentIfNotExists && !zkUtils.exists(rootNodePath)) { try { safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException e) { @@ -1234,45 +1232,9 @@ private String getLeafAppIdNodePath(String appId, appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists); } - @VisibleForTesting - byte[] getData(final String path) throws Exception { - return curatorFramework.getData().forPath(path); - } - - @VisibleForTesting - List getACL(final String path) throws Exception { - return curatorFramework.getACL().forPath(path); - } - - @VisibleForTesting - List getChildren(final String path) throws Exception { - return curatorFramework.getChildren().forPath(path); - } - - @VisibleForTesting - boolean exists(final String path) throws Exception { - return curatorFramework.checkExists().forPath(path) != null; - } - - @VisibleForTesting - void create(final String path) throws Exception { - if (!exists(path)) { - curatorFramework.create() - .withMode(CreateMode.PERSISTENT).withACL(zkAcl) - .forPath(path, null); - } - } - - @VisibleForTesting - void delete(final String path) throws Exception { - if (exists(path)) { - curatorFramework.delete().deletingChildrenIfNeeded().forPath(path); - } - } - private void safeCreate(String path, byte[] data, List acl, CreateMode mode) throws Exception { - if (!exists(path)) { + if (!zkUtils.exists(path)) { SafeTransaction transaction = new SafeTransaction(); transaction.create(path, data, acl, mode); transaction.commit(); @@ -1285,7 +1247,7 @@ private void safeCreate(String path, byte[] data, List acl, * @throws Exception if any problem occurs while performing deletion. */ private void safeDelete(final String path) throws Exception { - if (exists(path)) { + if (zkUtils.exists(path)) { SafeTransaction transaction = new SafeTransaction(); transaction.delete(path); transaction.commit(); @@ -1310,7 +1272,7 @@ private void safeSetData(String path, byte[] data, int version) private CuratorTransactionFinal transactionFinal; SafeTransaction() throws Exception { - CuratorTransaction transaction = curatorFramework.inTransaction(); + CuratorTransaction transaction = zkUtils.getCurator().inTransaction(); transactionFinal = transaction.create() .withMode(CreateMode.PERSISTENT).withACL(zkAcl) .forPath(fencingNodePath, new byte[0]).and(); @@ -1364,4 +1326,34 @@ public void run() { } } } + + @VisibleForTesting + protected List getACL(String path) throws Exception { + return zkUtils.getACL(path); + } + + @VisibleForTesting + protected void create(String path) throws Exception { + zkUtils.create(path); + } + + @VisibleForTesting + protected boolean exists(String path) throws Exception { + return zkUtils.exists(path); + } + + @VisibleForTesting + protected List getChildren(String path) throws Exception { + return zkUtils.getChildren(path); + } + + @VisibleForTesting + protected byte[] getData(String path) throws Exception { + return this.zkUtils.getData(path); + } + + @VisibleForTesting + protected void delete(String path) throws Exception { + this.zkUtils.delete(path); + } }