+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.StringKeyIgnoreCaseMultivaluedMap;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.xml.ws.http.HTTPException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * REST implementation of {@link FederationStateStore}.
+ */
+public class HttpProxyFederationStateStore implements FederationStateStore {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(HttpProxyFederationStateStore.class);
+
+ private String webAppUrl;
+ private String webAppResourceRootPath;
+ private final Clock clock = new MonotonicClock();
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ this.webAppUrl = WebAppUtils.HTTP_PREFIX + conf
+ .get(YarnConfiguration.FEDERATION_STATESTORE_HTTP_URL,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_URL);
+
+ this.webAppResourceRootPath = HttpProxyFederationStateStoreConsts.ROOT;
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest request) throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ SubClusterInfoDAO scInfoDAO =
+ new SubClusterInfoDAO(request.getSubClusterInfo());
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_REGISTER, scInfoDAO,
+ null);
+ } catch (HTTPException e) {
+ LOG.error("registerSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest request) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER,
+ new SubClusterDeregisterDAO(request), null);
+ } catch (HTTPException e) {
+ LOG.error("deregisterSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest request) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT,
+ new SubClusterHeartbeatDAO(request), null);
+ } catch (HTTPException e) {
+ LOG.error("subClusterHeartbeat REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
+
+ SubClusterId scId = subClusterRequest.getSubClusterId();
+ String scIdStr = scId.getId();
+ SubClusterInfoDAO scInfoDao = null;
+
+ long startTime = clock.getTime();
+
+ try {
+ scInfoDao = invokeGet(
+ HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS + "/" + scIdStr,
+ null, SubClusterInfoDAO.class);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(clock.getTime() - startTime);
+ } catch (HTTPException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the SubCluster information for " + scIdStr, e);
+ }
+
+ if (scInfoDao == null) {
+ LOG.warn("The queried SubCluster: {} does not exist.", scIdStr);
+ return null;
+ }
+
+ SubClusterInfo scInfo = scInfoDao.toSubClusterInfo();
+
+ try {
+ FederationMembershipStateStoreInputValidator.checkSubClusterInfo(scInfo);
+ } catch (FederationStateStoreInvalidInputException e) {
+ String errMsg = "SubCluster " + scIdStr + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return GetSubClusterInfoResponse.newInstance(scInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+
+ boolean filterInactiveSubClusters =
+ subClustersRequest.getFilterInactiveSubClusters();
+
+ MultivaluedMap
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.util.Apps;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * ApplicationHomeSubCluster is a report of the runtime information of the
+ * application that is running in the federated cluster.
+ *
+ * It includes information such as:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * ApplicationHomeSubClustersDao is a report of the runtime information of the
+ * applications that are running in the federated cluster.
+ *
+ */
+@XmlRootElement(name = "applicationsHomeSubClusterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ApplicationsHomeSubClusterDAO {
+
+ private ArrayList
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * The request sent to set the state of a subcluster to either
+ * SC_DECOMMISSIONED, SC_LOST, or SC_DEREGISTERED.
+ *
+ * The update includes details such as:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterHeartbeatRequest is a report of the runtime information of the
+ * subcluster that is participating in federation.
+ *
+ * It includes information such as:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterId represents the globally unique identifier for a
+ * subcluster that is participating in federation.
+ *
+ * The globally unique nature of the identifier is obtained from the
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterInfo is a report of the runtime information of the subcluster tha
+ * is participating in federation.
+ *
+ *
+ * It includes information such as:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * SubClusterPoliciesConfigurationsDAO is a class that represents a set of a
+ * policies.
+ */
+@XmlRootElement(name = "subClusterPoliciesConfigurationsDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterPoliciesConfigurationsDAO {
+
+ private ArrayList
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.nio.ByteBuffer;
+
+/**
+ * {@link SubClusterPolicyConfigurationDAO} is a class that represents a
+ * configuration of a policy. For a single queue, it contains a policy type
+ * (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
+ *
+ * Note: by design the params are an opaque ByteBuffer, this allows for enough
+ * flexibility to evolve the policies without impacting the protocols to/from
+ * the federation state store.
+ */
+@XmlRootElement(name = "subClusterPolicyConfigurationDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterPolicyConfigurationDAO {
+ private String queueName;
+ private String policyType;
+ private String policyParams;
+
+ public SubClusterPolicyConfigurationDAO() {
+ } // JAXB needs this
+
+ public SubClusterPolicyConfigurationDAO(
+ SubClusterPolicyConfiguration policy) {
+ this.queueName = policy.getQueue();
+ this.policyType = policy.getType();
+ this.policyParams = Base64.encodeBase64String(policy.getParams().array());
+ }
+
+ public SubClusterPolicyConfiguration toSubClusterPolicyConfiguration() {
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+ ByteBuffer.wrap(Base64.decodeBase64(policyParams)));
+ }
+
+ public String getPolicyType() {
+ return policyType;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getPolicyParams() {
+ return policyParams;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java
new file mode 100644
index 00000000000..6b1f1166425
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * State of a
+ */
+@XmlRootElement(name = "subClusterStateDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterStateDAO {
+
+ protected enum SCState {
+ SC_NEW,
+
+ /**
+ * Subcluster is registered and the RM sent a heartbeat recently.
+ */
+ SC_RUNNING,
+
+ /**
+ * Subcluster is unhealthy.
+ */
+ SC_UNHEALTHY,
+
+ /**
+ * Subcluster is in the process of being out of service.
+ */
+ SC_DECOMMISSIONING,
+
+ /**
+ * Subcluster is out of service.
+ */
+ SC_DECOMMISSIONED,
+
+ /**
+ * RM has not sent a heartbeat for some configured time threshold.
+ */
+ SC_LOST,
+
+ /**
+ * Subcluster has unregistered.
+ */
+ SC_UNREGISTERED;
+ };
+
+ private SCState scState;
+
+ public SubClusterStateDAO() {
+ } // JAXB needs this
+
+ public SubClusterStateDAO(SubClusterState state) {
+ scState = SCState.valueOf(state.toString());
+ }
+
+ public SubClusterState getSubClusterState() {
+ return SubClusterState.valueOf(scState.toString());
+ }
+
+ public SCState getScState() {
+ return scState;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java
new file mode 100644
index 00000000000..7327a4126b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * SubClustersInfoDAO is a class that represent a set of SubClusterInfo.
+ */
+@XmlRootElement(name = "subClustersInfoMap")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClustersInfoDAO {
+
+ private ArrayList
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+/** Federation StateStore DAO package.*/
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index df5f50c67a4..d4be4c34021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -67,9 +67,11 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
@@ -465,6 +467,31 @@ public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
return;
}
+ /**
+ * Register a subcluster identified by {@code SubClusterId} to
+ * indicate participation in federation
+ *
+ * @param info the SubClusterInfo for the SubCluster that is participating
+ * in federation
+ */
+ public void registerSubCluster(SubClusterInfo info) throws YarnException {
+ stateStore.registerSubCluster(SubClusterRegisterRequest.newInstance(info));
+ }
+
+ /**
+ * Periodic heartbeat from a
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtoco
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ **/
+
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+ implements StateStoreWebServiceProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+ private final Router router;
+ private final Configuration conf;
+ private @Context HttpServletResponse response;
+
+ private Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager server.
+ *
+ * This class includes all methods that the StateStore implements but in
+ * REST format.
+ */
+public interface StateStoreProxyInterceptor
+ extends Configurable, StateStoreWebServiceProtocol {
+
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param user the name of the client
+ */
+ void init(String user);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor the RESTRequestInterceptor to set in the pipeline
+ */
+ void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ StateStoreProxyInterceptor getNextInterceptor();
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java
new file mode 100644
index 00000000000..8094a1b8b35
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply rejects all the client requests.
+ */
+public class StateStoreProxyRejectInterceptor implements
+ StateStoreProxyInterceptor {
+
+ private @Context Configuration conf;
+
+ @Override
+ public void init(String user) {
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public void setNextInterceptor(
+ StateStoreProxyInterceptor nextInterceptor) {
+ throw new YarnRuntimeException(
+ this.getClass().getName() + " should be the last interceptor");
+ }
+
+ @Override
+ public StateStoreProxyInterceptor getNextInterceptor() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getSubCluster(String subClusterId) throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getSubClusters(boolean filterInactiveSubclusters)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getPoliciesConfigurations() throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getPolicyConfiguration(String queue) throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response setPolicyConfiguration(
+ SubClusterPolicyConfigurationDAO policyConf) throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response addApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getApplicationHomeSubCluster(String appId)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response getApplicationsHomeSubCluster() throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+
+ @Override
+ public Response deleteApplicationHomeSubCluster(String appId)
+ throws YarnException {
+ return Response.status(501).entity(
+ "Request reached " + StateStoreProxyRejectInterceptor.class.getName())
+ .build();
+ }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java
new file mode 100644
index 00000000000..f9f720afc76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+
+import javax.ws.rs.core.Response;
+
+/**
+ * The protocol between clients and the FederationStateStore
+ * over REST APIs
+ */
+
+public interface StateStoreWebServiceProtocol {
+
+ /**
+ * Register a subcluster by publishing capabilities as represented by
+ * {@code SubClusterInfo} to indicate participation in federation. This is
+ * typically done during initialization or restart/failover of the
+ * subcluster's
+ *
+ */
+@XmlRootElement(name = "applicationHomeSubClusterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ApplicationHomeSubClusterDAO {
+
+ private String appId;
+ private SubClusterIdDAO subClusterId;
+
+ public ApplicationHomeSubClusterDAO() {
+ } // JAXB needs this
+
+ public ApplicationHomeSubClusterDAO(ApplicationHomeSubCluster appHome) {
+ appId = appHome.getApplicationId().toString();
+ subClusterId = new SubClusterIdDAO(appHome.getHomeSubCluster());
+ }
+
+ public ApplicationHomeSubClusterDAO(
+ AddApplicationHomeSubClusterRequest request) {
+ this(request.getApplicationHomeSubCluster());
+ }
+
+ public ApplicationHomeSubClusterDAO(
+ UpdateApplicationHomeSubClusterRequest request) {
+ this(request.getApplicationHomeSubCluster());
+ }
+
+ public ApplicationHomeSubCluster toApplicationHomeSubCluster() {
+ return ApplicationHomeSubCluster
+ .newInstance(Apps.toAppID(appId), subClusterId.toSubClusterId());
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public SubClusterIdDAO getSubClusterId() {
+ return subClusterId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java
new file mode 100644
index 00000000000..baaa71fa728
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ */
+@XmlRootElement(name = "deregisterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterDeregisterDAO {
+
+ private SubClusterIdDAO scId;
+ private SubClusterStateDAO scState;
+
+ public SubClusterDeregisterDAO() {
+ } // JAXB needs this
+
+ public SubClusterDeregisterDAO(SubClusterId id, SubClusterState state) {
+ this.scId = new SubClusterIdDAO(id);
+ this.scState = new SubClusterStateDAO(state);
+ }
+
+ public SubClusterDeregisterDAO(SubClusterDeregisterRequest request) {
+ this(request.getSubClusterId(), request.getState());
+ }
+
+ public SubClusterIdDAO getSubClusterId() {
+ return scId;
+ }
+
+ public SubClusterStateDAO getScState() {
+ return scState;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java
new file mode 100644
index 00000000000..2b40e869aba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ */
+@XmlRootElement(name = "heartbeatDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterHeartbeatDAO {
+
+ private SubClusterIdDAO scId;
+ private SubClusterStateDAO scState;
+ private String capability;
+
+ public SubClusterHeartbeatDAO() {
+ } // JAXB needs this
+
+ public SubClusterHeartbeatDAO(SubClusterId id, SubClusterState state,
+ String capability) {
+ this.scId = new SubClusterIdDAO(id);
+ this.scState = new SubClusterStateDAO(state);
+ this.capability = capability;
+ }
+
+ public SubClusterHeartbeatDAO(SubClusterHeartbeatRequest request) {
+ this(request.getSubClusterId(), request.getState(),
+ request.getCapability());
+ }
+
+ public SubClusterIdDAO getSubClusterId() {
+ return scId;
+ }
+
+ public SubClusterStateDAO getScState() {
+ return scState;
+ }
+
+ public String getCapability() {
+ return capability;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java
new file mode 100644
index 00000000000..46a25e84de5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * FederationMembershipStateStore on initialization.
+ */
+@XmlRootElement(name = "subClustersIdDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterIdDAO {
+
+ private String subClusterId;
+
+ public SubClusterIdDAO() {
+ } // JAXB needs this
+
+ public SubClusterIdDAO(SubClusterId scId) {
+ subClusterId = scId.getId();
+ }
+
+ public SubClusterId toSubClusterId(){
+ return SubClusterId.newInstance(subClusterId);
+ }
+
+ public String getSubClusterId() {
+ return subClusterId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java
new file mode 100644
index 00000000000..9b89c26b73e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ */
+
+@XmlRootElement(name = "subClusterInfoDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterInfoDAO {
+
+ private String subClusterId;
+ private String amRMServiceAddress;
+ private String clientRMServiceAddress;
+ private String rmAdminServiceAddress;
+ private String rmWebServiceAddress;
+ private long lastHeartBeat;
+ private SubClusterStateDAO state;
+ private long lastStartTime;
+ private String capability;
+
+ public SubClusterInfoDAO() {
+ } // JAXB needs this
+
+ public SubClusterInfoDAO(SubClusterInfo scinfo) {
+ this.subClusterId = scinfo.getSubClusterId().toString();
+ this.amRMServiceAddress = scinfo.getAMRMServiceAddress();
+ this.clientRMServiceAddress = scinfo.getClientRMServiceAddress();
+ this.rmAdminServiceAddress = scinfo.getRMAdminServiceAddress();
+ this.rmWebServiceAddress = scinfo.getRMWebServiceAddress();
+ this.lastHeartBeat = scinfo.getLastHeartBeat();
+ this.state = new SubClusterStateDAO(scinfo.getState());
+ this.lastStartTime = scinfo.getLastStartTime();
+ this.capability = scinfo.getCapability();
+ }
+
+ public SubClusterInfo toSubClusterInfo() {
+ return SubClusterInfo
+ .newInstance(SubClusterId.newInstance(subClusterId), amRMServiceAddress,
+ clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+ this.lastHeartBeat, state.getSubClusterState(), lastStartTime,
+ capability);
+ }
+
+ public String getRmAdminServiceAddress() {
+ return rmAdminServiceAddress;
+ }
+
+ public SubClusterStateDAO getState() {
+ return state;
+ }
+
+ public long getLastStartTime() {
+ return lastStartTime;
+ }
+
+ public String getAmRMServiceAddress() {
+ return amRMServiceAddress;
+ }
+
+ public long getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ public String getCapability() {
+ return capability;
+ }
+
+ public String getClientRMServiceAddress() {
+ return clientRMServiceAddress;
+ }
+
+ public String getSubClusterId() {
+ return subClusterId;
+ }
+
+ public String getRmWebServiceAddress() {
+ return rmWebServiceAddress;
+ }
+
+ public void setAmRMServiceAddress(String amRMServiceAddress) {
+ this.amRMServiceAddress = amRMServiceAddress;
+ }
+}
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java
new file mode 100644
index 00000000000..79be71d26ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * SubCluster.
+ * ResourceManager to indicate
+ * liveliness.
+ *
+ * @param subClusterId the id of the sub cluster heart beating
+ * @param state the state of the sub cluster
+ * @param capability the capability of the sub cluster
+ */
+ public void subClusterHeartBeat(SubClusterId subClusterId,
+ SubClusterState state, String capability) throws YarnException {
+ stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
+ .newInstance(subClusterId, state, capability));
+ }
+
/**
* Get the singleton instance of SubClusterResolver.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
new file mode 100644
index 00000000000..45755eed366
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * ResourceManager. Upon successful registration, an
+ * identifier for the subcluster which is unique across the federated
+ * cluster is returned. The identifier is static, i.e. preserved across
+ * restarts and failover.
+ *
+ * @param scInfoDAO the capabilities of the subcluster that
+ * wants to participate in federation. The subcluster id is also
+ * specified in case registration is triggered by restart/failover
+ * @return response empty on successfully if registration was successful
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response registerSubCluster(SubClusterInfoDAO scInfoDAO) throws YarnException;
+
+ /**
+ * Deregister a subcluster identified by {@code SubClusterId} to
+ * change state in federation. This can be done to mark the sub cluster lost,
+ * deregistered, or decommissioned.
+ *
+ * @param deregisterDAO - the request to deregister the
+ * sub-cluster from federation.
+ * @return response empty on successfully deregistering the subcluster state
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException;
+
+ /**
+ * Periodic heartbeat from a ResourceManager participating in
+ * federation to indicate liveliness. The heartbeat publishes the current
+ * capabilities as represented by {@code SubClusterInfo} of the subcluster.
+ * Currently response is empty if the operation was successful, if not an
+ * exception reporting reason for a failure.
+ *
+ * @param heartbeatDAO the capabilities of the subcluster that
+ * wants to keep alive its participation in federation
+ * @return response currently empty on if heartbeat was successfully processed
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException;
+
+ /**
+ * Get the membership information of subcluster as identified by
+ * {@code SubClusterId}. The membership information includes the cluster
+ * endpoint and current capabilities as represented by {@code SubClusterInfo}.
+ *
+ * @param subClusterId the subcluster whose information is required
+ * @return the {@code SubClusterInfo}, or {@code null} if there is no mapping
+ * for the subcluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getSubCluster(String subClusterId) throws YarnException;
+
+ /**
+ * Get the membership information of all the subclusters that are
+ * currently participating in federation. The membership information includes
+ * the cluster endpoint and current capabilities as represented by
+ * {@code SubClusterInfo}.
+ *
+ * @param filterInactiveSubclusters whether to filter inactive sub clusters
+ * @return a map of {@code SubClusterInfo} keyed by the {@code SubClusterId}
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getSubClusters(boolean filterInactiveSubclusters)
+ throws YarnException;
+
+ /**
+ * Get a map of all queue-to-policy configurations.
+ *
+ * @return the policies for all currently active queues in the system
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getPoliciesConfigurations() throws YarnException;
+
+ /**
+ * Get the policy configuration for a given queue.
+ *
+ * @param queue the queue whose {@code SubClusterPolicyConfiguration} is
+ * required
+ * @return the {@code SubClusterPolicyConfiguration} for the specified queue,
+ * or {@code null} if there is no mapping for the queue
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getPolicyConfiguration(String queue) throws YarnException;
+
+ /**
+ * Set the policy configuration for a given queue.
+ *
+ * @param policyConf the {@code SubClusterPolicyConfiguration} with the
+ * corresponding queue
+ * @return response empty on successfully updating the
+ * {@code SubClusterPolicyConfiguration} for the specified queue
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response setPolicyConfiguration(SubClusterPolicyConfigurationDAO policyConf)
+ throws YarnException;
+
+ /**
+ * Register the home {@code SubClusterId} of the newly submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure. If a
+ * mapping for the application already existed, the {@code SubClusterId} in
+ * this response will return the existing mapping which might be different
+ * from that in the {@code AddApplicationHomeSubClusterRequest}.
+ *
+ * @param appHomeDAO the request to register a new application with its home
+ * sub-cluster
+ * @return upon successful registration of the application in the StateStore,
+ * {@code AddApplicationHomeSubClusterRequest} containing the home
+ * sub-cluster of the application. Otherwise, an exception reporting
+ * reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response addApplicationHomeSubCluster(ApplicationHomeSubClusterDAO appHomeDAO)
+ throws YarnException;
+
+ /**
+ * Update the home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param appHomeDAO the request to update the home sub-cluster of an
+ * application.
+ * @return empty on successful update of the application in the StateStore, if
+ * not an exception reporting reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException;
+
+ /**
+ * Get information about the application identified by the input
+ * {@code ApplicationId}.
+ *
+ * @param appId the application to query
+ * @return {@code ApplicationHomeSubCluster} containing the application's home
+ * subcluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getApplicationHomeSubCluster(String appId) throws YarnException;
+
+ /**
+ * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+ * all submitted applications to it's home sub-cluster.
+ *
+ * @return the mapping of all submitted application to it's home sub-cluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getApplicationsHomeSubCluster() throws YarnException;
+
+ /**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param appId the ID of the application to delete
+ * @return empty on successful update of the application in the StateStore, if
+ * not an exception reporting reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response deleteApplicationHomeSubCluster(String appId) throws YarnException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java
new file mode 100644
index 00000000000..ff50e1c18a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router Federation StateStore Proxy package. **/
+package org.apache.hadoop.yarn.server.router.ssproxy;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
index ba07a1afba4..e8af9e4bb85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.router.ssproxy.RouterStateStoreProxyWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
@@ -38,6 +39,7 @@ public RouterWebApp(Router router) {
public void setup() {
bind(JAXBContextResolver.class);
bind(RouterWebServices.class);
+ bind(RouterStateStoreProxyWebServices.class);
bind(GenericExceptionHandler.class);
bind(RouterWebApp.class).toInstance(this);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java
new file mode 100644
index 00000000000..c3b1e4c5d03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java
@@ -0,0 +1,79 @@
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.FederationStateStoreBaseTest;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStore;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ * Validate the correctness of the StateStoreProxy in the router.
+ * This test case works by setting the facade to use a MemoryStateStore,
+ * and then testing the HttpProxyFederationStateStore, which communicates
+ * via REST with the StateStoreProxyWebServices
+ */
+public class TestRouterStateStoreProxy extends FederationStateStoreBaseTest {
+
+ private static Router router;
+
+ private static FederationStateStore proxyStore;
+
+ public TestRouterStateStoreProxy() {
+ }
+
+ @BeforeClass
+ public static void setUpProxy() {
+ // Set up the router proxy to use a memory state store
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
+ MemoryFederationStateStore.class.getName());
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ conf.set(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+ StateStoreProxyDefaultInterceptor.class.getName());
+ proxyStore = new MemoryFederationStateStore();
+ FederationStateStoreFacade.getInstance().reinitialize(proxyStore, conf);
+ router = new Router();
+ router.init(conf);
+ router.start();
+ }
+
+ @AfterClass
+ public static void tearDownProxy() {
+ router.stop();
+ }
+
+ @Before
+ @Override
+ public void before() throws IOException, YarnException {
+ super.before();
+ proxyStore.init(getConf());
+ }
+
+ @After
+ @Override
+ public void after() throws Exception {
+ super.after();
+ proxyStore.close();
+ }
+
+ @Override
+ protected FederationStateStore createStateStore() {
+ Configuration conf = new Configuration();
+
+ // Set up our local state store
+ super.setConf(conf);
+ return new HttpProxyFederationStateStore();
+ }
+
+}
+