diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cd4d569a77a..195adada541 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2590,7 +2590,7 @@ public static boolean isAclEnabled(Configuration conf) { FEDERATION_PREFIX + "state-store.class"; public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS = - "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore"; + "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore"; public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; @@ -2629,6 +2629,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + public static final String FEDERATION_STATESTORE_ZK_PREFIX = + FEDERATION_PREFIX + "zk-state-store."; + /** Parent znode path under which ZKRMStateStore will create znodes */ + public static final String FEDERATION_STATESTORE_RM_PARENT_PATH = + FEDERATION_STATESTORE_ZK_PREFIX + "parent-path"; + public static final String DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH = + "/federationstore"; + private static final String FEDERATION_STATESTORE_SQL_PREFIX = FEDERATION_PREFIX + "state-store.sql."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java new file mode 100644 index 00000000000..15333ef3df8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -0,0 +1,653 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- MEMBERSHIP
+ * | |----- SC1
+ * | |----- SC2
+ * |--- APPLICATION
+ * | |----- APP1
+ * | |----- APP2
+ * |--- POLICY
+ * |----- QUEUE1
+ * |----- QUEUE1
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+ private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+ private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+ private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+ /** Interface to Zookeeper. */
+ private ZKCuratorManager zkManager;
+
+ /** Directory to store the state store data. */
+ private String baseZNode;
+
+ private String appsZNode;
+ private String membershipZNode;
+ private String policiesZNode;
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ LOG.info("Initializing ZooKeeper connection");
+
+ baseZNode = conf.get(
+ YarnConfiguration.FEDERATION_STATESTORE_RM_PARENT_PATH,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_RM_PARENT_PATH);
+ try {
+ this.zkManager = new ZKCuratorManager(conf);
+ this.zkManager.start();
+ } catch (IOException e) {
+ LOG.error("Cannot initialize the ZK connection", e);
+ }
+
+ // Base znodes
+ membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+ appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+ policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+
+ // Create base znode for each entity
+ try {
+ createRootDirRecursively(membershipZNode);
+ createRootDirRecursively(appsZNode);
+ createRootDirRecursively(policiesZNode);
+ } catch (Exception e) {
+ String errMsg = "Cannot create base directories: " + e.getMessage();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (zkManager != null) {
+ zkManager.close();
+ }
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+
+ // Try to write the subcluster
+ SubClusterId homeSubCluster = app.getHomeSubCluster();
+ try {
+ putApp(appId, homeSubCluster, false);
+ } catch (Exception e) {
+ String errMsg = "Cannot add application home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ // Check for the actual subcluster
+ try {
+ homeSubCluster = getApp(appId);
+ } catch (Exception e) {
+ String errMsg = "Cannot check app home subcluster for " + appId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(homeSubCluster);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse
+ updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request)
+ throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+ ApplicationId appId = app.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ SubClusterId newSubClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+ putApp(appId, newSubClusterId, true);
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+ ApplicationId appId = request.getApplicationId();
+ SubClusterId homeSubCluster = getApp(appId);
+ if (homeSubCluster == null) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ return GetApplicationHomeSubClusterResponse.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ZookeeperFederationStateStore.
+ */
+public class TestZookeeperFederationStateStore
+ extends FederationStateStoreBaseTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
+
+ /** Zookeeper test server. */
+ private static TestingServer curatorTestingServer;
+ private static CuratorFramework curatorFramework;
+
+ @Before
+ public void before() throws IOException, YarnException {
+ try {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ String connectString = curatorTestingServer.getConnectString();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(connectString)
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, connectString);
+ setConf(conf);
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZooKeeper store", e);
+ throw new IOException(e);
+ }
+
+ super.before();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.after();
+
+ curatorFramework.close();
+ try {
+ curatorTestingServer.stop();
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ protected FederationStateStore createStateStore() {
+ Configuration conf = new Configuration();
+ super.setConf(conf);
+ return new ZookeeperFederationStateStore();
+ }
+}
\ No newline at end of file