Comma-separated list of PlacementRules to determine how applications
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java
new file mode 100644
index 00000000000..2f596e76069
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.StringKeyIgnoreCaseMultivaluedMap;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.xml.ws.http.HTTPException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * REST implementation of {@link FederationStateStore}.
+ */
+public class HttpProxyFederationStateStore implements FederationStateStore {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(HttpProxyFederationStateStore.class);
+
+ private String webAppUrl;
+ private String webAppResourceRootPath;
+ private final Clock clock = new MonotonicClock();
+ private Map subClusters;
+
+ // It is very expensive to create the client
+ // Jersey will spawn a thread for every client request
+ private Client client = null;
+
+ public final static String FEDERATION_DEFAULT_SUB_CLUSTERS =
+ "yarn.federation.default.subclusters";
+
+ public final static String FEDERATION_STATE_STORE_FALLBACK_ENABLED =
+ "yarn.federation.statestore.fallback.enabled";
+ public final static boolean DEFAULT_FEDERATION_STATE_STORE_TIMEOUT_ENABLED =
+ false;
+
+ private boolean fallbackToConfig;
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ this.webAppUrl = WebAppUtils.HTTP_PREFIX
+ + conf.get(YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_URL,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_URL);
+
+ this.webAppResourceRootPath = HttpProxyFederationStateStoreConsts.ROOT;
+ this.client = createJerseyClient(conf);
+ this.subClusters = new ConcurrentHashMap<>();
+ this.fallbackToConfig =
+ conf.getBoolean(FEDERATION_STATE_STORE_FALLBACK_ENABLED,
+ DEFAULT_FEDERATION_STATE_STORE_TIMEOUT_ENABLED);
+ loadSubClustersFromConfig(conf);
+ }
+
+ protected void loadSubClustersFromConfig(Configuration conf)
+ throws YarnException {
+ String str = conf.get(FEDERATION_DEFAULT_SUB_CLUSTERS);
+ if (str == null) {
+ throw new YarnException("Default subclusters not configured in "
+ + FEDERATION_DEFAULT_SUB_CLUSTERS);
+ }
+ Iterator iter = StringUtils.getStringCollection(str).iterator();
+ while (iter.hasNext()) {
+ SubClusterId scId = SubClusterId.newInstance(iter.next().trim());
+ String rmAddr = iter.next().trim();
+ String amRmPort = iter.next().trim();
+ String clientRmPort = iter.next().trim();
+ String adminPort = iter.next().trim();
+ String webPort = iter.next().trim();
+ subClusters.put(scId,
+ SubClusterInfo.newInstance(scId, rmAddr + ":" + amRmPort,
+ rmAddr + ":" + clientRmPort, rmAddr + ":" + adminPort,
+ rmAddr + ":" + webPort, 0, SubClusterState.SC_RUNNING, 0, ""));
+ LOG.info("Loaded subcluster from config: {}", subClusters.get(scId));
+ }
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest request) throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ SubClusterInfoDAO scInfoDAO =
+ new SubClusterInfoDAO(request.getSubClusterInfo());
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_REGISTER, scInfoDAO,
+ null);
+ } catch (HTTPException e) {
+ LOG.error("registerSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest request) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER,
+ new SubClusterDeregisterDAO(request), null);
+ } catch (HTTPException e) {
+ LOG.error("deregisterSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest request) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(request);
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT,
+ new SubClusterHeartbeatDAO(request), null);
+ } catch (HTTPException e) {
+ LOG.error("subClusterHeartbeat REST call failed ", e);
+ return null;
+ }
+
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
+
+ SubClusterId scId = subClusterRequest.getSubClusterId();
+ String scIdStr = scId.getId();
+ SubClusterInfoDAO scInfoDao = null;
+
+ long startTime = clock.getTime();
+
+ try {
+ scInfoDao = invokeGet(
+ HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS + "/" + scIdStr,
+ null, SubClusterInfoDAO.class);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(clock.getTime() - startTime);
+ } catch (HTTPException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the SubCluster information for " + scIdStr, e);
+ }
+
+ if (scInfoDao == null) {
+ LOG.warn("The queried SubCluster: {} does not exist.", scIdStr);
+ return null;
+ }
+
+ SubClusterInfo scInfo = scInfoDao.toSubClusterInfo();
+
+ try {
+ FederationMembershipStateStoreInputValidator.checkSubClusterInfo(scInfo);
+ } catch (FederationStateStoreInvalidInputException e) {
+ String errMsg = "SubCluster " + scIdStr + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ return GetSubClusterInfoResponse.newInstance(scInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+
+ boolean filterInactiveSubClusters =
+ subClustersRequest.getFilterInactiveSubClusters();
+
+ MultivaluedMap queryParams =
+ new StringKeyIgnoreCaseMultivaluedMap();
+ queryParams.add(HttpProxyFederationStateStoreConsts.QUERY_SC_FILTER,
+ Boolean.toString(filterInactiveSubClusters));
+
+ SubClustersInfoDAO scsInfoDao = null;
+
+ long startTime = clock.getTime();
+
+ try {
+ scsInfoDao =
+ invokeGet(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS,
+ queryParams, SubClustersInfoDAO.class);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(clock.getTime() - startTime);
+ } catch (Exception e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ if (fallbackToConfig) {
+ List subClustersInfo = new ArrayList<>();
+ subClustersInfo.addAll(subClusters.values());
+ return GetSubClustersInfoResponse.newInstance(subClustersInfo);
+ }
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the information for all the SubClusters ", e);
+ }
+
+ List scInfoList = new ArrayList<>();
+
+ if (scsInfoDao != null) {
+ for (SubClusterInfoDAO scInfoDao : scsInfoDao.getSubClusters()) {
+ SubClusterInfo scInfo = scInfoDao.toSubClusterInfo();
+ try {
+ FederationMembershipStateStoreInputValidator
+ .checkSubClusterInfo(scInfo);
+ } catch (FederationStateStoreInvalidInputException e) {
+ String errMsg = "SubCluster " + scInfo.getSubClusterId().toString()
+ + " is not valid";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ scInfoList.add(scInfo);
+ }
+ }
+
+ return GetSubClustersInfoResponse.newInstance(scInfoList);
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+
+ SubClusterPoliciesConfigurationsDAO scPoliciesConfDao = null;
+
+ long startTime = clock.getTime();
+
+ try {
+ scPoliciesConfDao =
+ invokeGet(HttpProxyFederationStateStoreConsts.PATH_POLICY, null,
+ SubClusterPoliciesConfigurationsDAO.class);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(clock.getTime() - startTime);
+ } catch (HTTPException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the policy information for all the queues.", e);
+ }
+
+ List scPoliciesList = new ArrayList<>();
+
+ if (scPoliciesConfDao != null) {
+ for (SubClusterPolicyConfigurationDAO scPolicyDao : scPoliciesConfDao.getPolicies()) {
+ scPoliciesList.add(scPolicyDao.toSubClusterPolicyConfiguration());
+ }
+ }
+
+ return GetSubClusterPoliciesConfigurationsResponse
+ .newInstance(scPoliciesList);
+ }
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ // Input validator
+ FederationPolicyStoreInputValidator.validate(request);
+
+ String queueName = request.getQueue();
+ SubClusterPolicyConfigurationDAO scPolicyConfDao = null;
+
+ long startTime = clock.getTime();
+
+ try {
+ scPolicyConfDao = invokeGet(
+ HttpProxyFederationStateStoreConsts.PATH_POLICY + "/" + queueName,
+ null, SubClusterPolicyConfigurationDAO.class);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(clock.getTime() - startTime);
+ } catch (HTTPException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to select the policy for the queue :" + queueName, e);
+ }
+
+ if (scPolicyConfDao == null) {
+ LOG.warn("Policy for queue: {} does not exist.", queueName);
+ return null;
+ }
+
+ SubClusterPolicyConfiguration scPolicy =
+ scPolicyConfDao.toSubClusterPolicyConfiguration();
+
+ return GetSubClusterPolicyConfigurationResponse.newInstance(scPolicy);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ // Input validator
+ FederationPolicyStoreInputValidator.validate(request);
+
+ try {
+ invokePost(HttpProxyFederationStateStoreConsts.PATH_POLICY,
+ new SubClusterPolicyConfigurationDAO(
+ request.getPolicyConfiguration()),
+ null);
+ } catch (HTTPException e) {
+ LOG.error("setPolicyConfiguration REST call failed ", e);
+ return null;
+ }
+
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ SubClusterIdDAO scIdDAO;
+
+ try {
+ scIdDAO = invokePost(HttpProxyFederationStateStoreConsts.PATH_APP_HOME,
+ new ApplicationHomeSubClusterDAO(request), SubClusterIdDAO.class);
+ } catch (HTTPException e) {
+ LOG.error("addApplicationHomeSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(scIdDAO.toSubClusterId());
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ try {
+ invokePut(HttpProxyFederationStateStoreConsts.PATH_APP_HOME,
+ new ApplicationHomeSubClusterDAO(request));
+ } catch (HTTPException e) {
+ LOG.error("updateApplicationHomeSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ String appId = request.getApplicationId().toString();
+
+ SubClusterIdDAO scIdDAO;
+ try {
+ scIdDAO = invokeGet(
+ HttpProxyFederationStateStoreConsts.PATH_APP_HOME + "/" + appId, null,
+ SubClusterIdDAO.class);
+ } catch (HTTPException e) {
+ LOG.error("getApplicationHomeSubCluster REST call failed ", e);
+ return null;
+ }
+
+ if (scIdDAO == null) {
+ return null;
+ }
+
+ return GetApplicationHomeSubClusterResponse
+ .newInstance(ApplicationHomeSubCluster
+ .newInstance(request.getApplicationId(), scIdDAO.toSubClusterId()));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ ApplicationsHomeSubClusterDAO ret;
+ try {
+ ret = invokeGet(HttpProxyFederationStateStoreConsts.PATH_APP_HOME, null,
+ ApplicationsHomeSubClusterDAO.class);
+ } catch (HTTPException e) {
+ LOG.error("getApplicationsHomeSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return GetApplicationsHomeSubClusterResponse
+ .newInstance(ret.toApplicationsHome());
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ String appId = request.getApplicationId().toString();
+
+ try {
+ invokeDelete(
+ HttpProxyFederationStateStoreConsts.PATH_APP_HOME + "/" + appId);
+ } catch (HTTPException e) {
+ LOG.error("deleteApplicationHomeSubCluster REST call failed ", e);
+ return null;
+ }
+
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ throw new NotImplementedException("Code is not implemented");
+ }
+
+ @Override
+ public Version loadVersion() {
+ throw new NotImplementedException("Code is not implemented");
+ }
+
+ private T invokeGet(String path,
+ MultivaluedMap queryParams, final Class responseClass)
+ throws YarnException {
+ return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.GET, path,
+ queryParams, null, responseClass);
+ }
+
+ private T invokeDelete(String path) throws YarnException {
+ return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.DELETE, path,
+ null, null, null);
+ }
+
+ private T invokePost(String path, Object body,
+ final Class responseClass) throws YarnException {
+ return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.POST, path,
+ null, body, responseClass);
+ }
+
+ private T invokePut(String path, Object body) throws YarnException {
+ return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.PUT, path,
+ null, body, null);
+ }
+
+ private T invokeFederationStateStoreProxy(String webApp,
+ WebApp.HTTP method, String path,
+ MultivaluedMap queryParams, Object body,
+ final Class responseClass) throws YarnException {
+ T obj = null;
+
+ WebResource webResource = client.resource(webApp);
+
+ if (queryParams != null && queryParams.size() > 0) {
+ webResource = webResource.queryParams(queryParams);
+ }
+
+ webResource = webResource.path(webAppResourceRootPath).path(path);
+
+ WebResource.Builder builder = webResource.accept(MediaType.APPLICATION_XML);
+
+ LOG.debug("Invoking HTTP REST request with method " + method.name()
+ + " to resource: " + builder.toString());
+ ClientResponse response = null;
+ try {
+ switch (method) {
+ case GET:
+ response = builder.get(ClientResponse.class);
+ break;
+ case PUT:
+ response = builder.put(ClientResponse.class, body);
+ break;
+ case POST:
+ response = builder.post(ClientResponse.class, body);
+ break;
+ case DELETE:
+ response = builder.delete(ClientResponse.class);
+ break;
+ default:
+ throw new YarnRuntimeException(
+ "Unknown HTTP method type " + method.name());
+ }
+
+ if (response.getStatus() == 200) {
+ if (responseClass != null) {
+ obj = response.getEntity(responseClass);
+ }
+ } else if (response.getStatus() == 404) {
+ return null;
+ } else {
+ throw new FederationStateStoreException(
+ response.getEntity(RemoteExceptionData.class).getMessage());
+ }
+ return obj;
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ if (client != null) {
+ client.destroy();
+ }
+ }
+ }
+
+ /**
+ * Create a Jersey client instance.
+ */
+ private Client createJerseyClient(Configuration conf) {
+ Client client = Client.create();
+ client.setConnectTimeout(conf.getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS));
+ client.setReadTimeout(conf.getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS));
+ return client;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java
new file mode 100644
index 00000000000..b704a1d67fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+/**
+ * Constants for {@code StateStoreWebServiceProtocol}.
+ */
+public class HttpProxyFederationStateStoreConsts {
+ // Query Parameters
+ public static final String QUERY_SC_FILTER = "filterInactiveSubClusters";
+
+ // URL Parameters
+ public static final String PARAM_SCID = "subcluster";
+ public static final String PARAM_APPID = "appid";
+ public static final String PARAM_QUEUE = "queue";
+
+ // Paths
+ public static final String ROOT = "/ws/v1/statestore";
+
+ public static final String PATH_SUBCLUSTERS = "/subclusters";
+ public static final String PATH_SUBCLUSTERS_SCID =
+ "/subclusters/{" + PARAM_SCID + "}";
+
+ public static final String PATH_POLICY = "/policy";
+ public static final String PATH_POLICY_QUEUE =
+ "/policy/{" + PARAM_QUEUE + "}";
+
+ public static final String PATH_VIP_HEARTBEAT = "/heartbeat";
+
+ public static final String PATH_APP_HOME = "/apphome";
+ public static final String PATH_APP_HOME_APPID =
+ "/apphome/{" + PARAM_APPID + "}";
+
+ public static final String PATH_REGISTER = "/subcluster/register";
+ public static final String PATH_DEREGISTER = "/subcluster/deregister";
+ public static final String PATH_HEARTBEAT = "/subcluster/heartbeat";
+
+ private HttpProxyFederationStateStoreConsts() {
+ // not called
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java
new file mode 100644
index 00000000000..f55a7d30c2c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.util.Apps;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * ApplicationHomeSubCluster is a report of the runtime information of the
+ * application that is running in the federated cluster.
+ *
+ * It includes information such as:
+ *
+ * - {@link ApplicationId}
+ * - {@link SubClusterId}
+ *
+ */
+@XmlRootElement(name = "applicationHomeSubClusterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ApplicationHomeSubClusterDAO {
+
+ private String appId;
+ private SubClusterIdDAO subClusterId;
+
+ public ApplicationHomeSubClusterDAO(){
+ } // JAXB needs this
+
+ public ApplicationHomeSubClusterDAO(ApplicationHomeSubCluster appHome){
+ appId = appHome.getApplicationId().toString();
+ subClusterId = new SubClusterIdDAO(appHome.getHomeSubCluster());
+ }
+
+ public ApplicationHomeSubClusterDAO(
+ AddApplicationHomeSubClusterRequest request){
+ this(request.getApplicationHomeSubCluster());
+ }
+
+ public ApplicationHomeSubClusterDAO(
+ UpdateApplicationHomeSubClusterRequest request){
+ this(request.getApplicationHomeSubCluster());
+ }
+
+ public ApplicationHomeSubCluster toApplicationHomeSubCluster(){
+ return ApplicationHomeSubCluster.newInstance(Apps.toAppID(appId),
+ subClusterId.toSubClusterId());
+ }
+
+ public String getAppId(){
+ return appId;
+ }
+
+ public SubClusterIdDAO getSubClusterId(){
+ return subClusterId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java
new file mode 100644
index 00000000000..0808ba6059e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * ApplicationsHomeSubClustersDao is a report of the runtime information of the
+ * applications that are running in the federated cluster.
+ *
+ */
+@XmlRootElement(name = "applicationsHomeSubClusterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ApplicationsHomeSubClusterDAO {
+
+ private ArrayList appHomes =
+ new ArrayList();
+
+ public ApplicationsHomeSubClusterDAO(){
+ } // JAXB needs this
+
+ public ApplicationsHomeSubClusterDAO(List appHomes){
+ for(ApplicationHomeSubCluster appHome : appHomes){
+ this.appHomes.add(new ApplicationHomeSubClusterDAO(appHome));
+ }
+ }
+
+ public List toApplicationsHome(){
+ List ret = new ArrayList<>();
+ for(ApplicationHomeSubClusterDAO appHome : appHomes){
+ ret.add(appHome.toApplicationHomeSubCluster());
+ }
+ return ret;
+ }
+
+ public ArrayList getAppHomes() {
+ return appHomes;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java
new file mode 100644
index 00000000000..a835aa65a59
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * The request sent to set the state of a subcluster to either
+ * SC_DECOMMISSIONED, SC_LOST, or SC_DEREGISTERED.
+ *
+ *
+ * The update includes details such as:
+ *
+ * - {@link SubClusterId}
+ * - {@link SubClusterState}
+ *
+ */
+@XmlRootElement(name = "deregisterDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterDeregisterDAO {
+
+ private SubClusterIdDAO scId;
+ private SubClusterStateDAO scState;
+
+ public SubClusterDeregisterDAO(){
+ } // JAXB needs this
+
+ public SubClusterDeregisterDAO(SubClusterId id, SubClusterState state){
+ this.scId = new SubClusterIdDAO(id);
+ this.scState = new SubClusterStateDAO(state);
+ }
+
+ public SubClusterDeregisterDAO(SubClusterDeregisterRequest request) {
+ this(request.getSubClusterId(), request.getState());
+ }
+
+ public SubClusterIdDAO getSubClusterId(){
+ return scId;
+ }
+
+ public SubClusterStateDAO getScState() {
+ return scState;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java
new file mode 100644
index 00000000000..a40d1efce50
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterHeartbeatRequest is a report of the runtime information of the
+ * subcluster that is participating in federation.
+ *
+ *
+ * It includes information such as:
+ *
+ * - {@link SubClusterId}
+ * - The URL of the subcluster
+ * - The timestamp representing the last start time of the subCluster
+ * - {@code FederationsubClusterState}
+ * - The current capacity and utilization of the subCluster
+ *
+ */
+@XmlRootElement(name = "heartbeatDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterHeartbeatDAO {
+
+ private SubClusterIdDAO scId;
+ private SubClusterStateDAO scState;
+ private String capability;
+
+ public SubClusterHeartbeatDAO(){
+ } // JAXB needs this
+
+ public SubClusterHeartbeatDAO(SubClusterId id, SubClusterState state, String capability){
+ this.scId = new SubClusterIdDAO(id);
+ this.scState = new SubClusterStateDAO(state);
+ this.capability = capability;
+ }
+
+ public SubClusterHeartbeatDAO(SubClusterHeartbeatRequest request){
+ this(request.getSubClusterId(), request.getState(),
+ request.getCapability());
+ }
+
+ public SubClusterIdDAO getSubClusterId(){
+ return scId;
+ }
+
+ public SubClusterStateDAO getScState() {
+ return scState;
+ }
+
+ public String getCapability(){
+ return capability;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java
new file mode 100644
index 00000000000..26aa344cf42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterId represents the globally unique identifier for a
+ * subcluster that is participating in federation.
+ *
+ *
+ * The globally unique nature of the identifier is obtained from the
+ * FederationMembershipStateStore on initialization.
+ */
+@XmlRootElement(name = "subClustersIdDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterIdDAO {
+
+ private String subClusterId;
+
+ public SubClusterIdDAO() {
+ } // JAXB needs this
+
+ public SubClusterIdDAO(SubClusterId scId) {
+ subClusterId = scId.getId();
+ }
+
+ public SubClusterId toSubClusterId(){
+ return SubClusterId.newInstance(subClusterId);
+ }
+
+ public String getSubClusterId() {
+ return subClusterId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java
new file mode 100644
index 00000000000..5b75affb1b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * SubClusterInfo is a report of the runtime information of the subcluster that
+ * is participating in federation.
+ *
+ *
+ * It includes information such as:
+ *
+ * - {@link SubClusterId}
+ * - The URL of the subcluster
+ * - The timestamp representing the last start time of the subCluster
+ * - {@code FederationsubClusterState}
+ * - The current capacity and utilization of the subCluster
+ *
+ */
+@XmlRootElement(name = "subClusterInfoDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterInfoDAO {
+
+ private String subClusterId;
+ private String amRMServiceAddress;
+ private String clientRMServiceAddress;
+ private String rmAdminServiceAddress;
+ private String rmWebServiceAddress;
+ private long lastHeartBeat;
+ private SubClusterStateDAO state;
+ private long lastStartTime;
+ private String capability;
+
+ public SubClusterInfoDAO() {
+ } // JAXB needs this
+
+ public SubClusterInfoDAO(SubClusterInfo scinfo) {
+ this.subClusterId = scinfo.getSubClusterId().toString();
+ this.amRMServiceAddress = scinfo.getAMRMServiceAddress();
+ this.clientRMServiceAddress = scinfo.getClientRMServiceAddress();
+ this.rmAdminServiceAddress = scinfo.getRMAdminServiceAddress();
+ this.rmWebServiceAddress = scinfo.getRMWebServiceAddress();
+ this.lastHeartBeat = scinfo.getLastHeartBeat();
+ this.state = new SubClusterStateDAO(scinfo.getState());
+ this.lastStartTime = scinfo.getLastStartTime();
+ this.capability = scinfo.getCapability();
+ }
+
+ public SubClusterInfo toSubClusterInfo() {
+ return SubClusterInfo.newInstance(
+ SubClusterId.newInstance(subClusterId),
+ amRMServiceAddress,
+ clientRMServiceAddress,
+ rmAdminServiceAddress,
+ rmWebServiceAddress,
+ this.lastHeartBeat,
+ state.getSubClusterState(),
+ lastStartTime,
+ capability);
+ }
+
+ public String getSubClusterId() {
+ return subClusterId;
+ }
+
+ public String getClientRMServiceAddress() {
+ return clientRMServiceAddress;
+ }
+
+ public String getAmRMServiceAddress() {
+ return amRMServiceAddress;
+ }
+
+ public long getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ public long getLastStartTime() {
+ return lastStartTime;
+ }
+
+ public String getCapability() {
+ return capability;
+ }
+
+ public String getRmAdminServiceAddress() {
+ return rmAdminServiceAddress;
+ }
+
+ public String getRmWebServiceAddress() {
+ return rmWebServiceAddress;
+ }
+
+ public SubClusterStateDAO getState() {
+ return state;
+ }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java
new file mode 100644
index 00000000000..230a80f4edc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * SubClusterPoliciesConfigurationsDAO is a class that represents a set of a
+ * policies.
+ */
+@XmlRootElement(name = "subClusterPoliciesConfigurationsDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterPoliciesConfigurationsDAO {
+
+ private ArrayList policies =
+ new ArrayList();
+
+ public SubClusterPoliciesConfigurationsDAO() {
+ } // JAXB needs this
+
+ public SubClusterPoliciesConfigurationsDAO(
+ Collection policies) {
+ for(SubClusterPolicyConfiguration entry : policies) {
+ this.policies.add(new SubClusterPolicyConfigurationDAO(entry));
+ }
+ }
+
+ public ArrayList getPolicies() {
+ return policies;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java
new file mode 100644
index 00000000000..cba8a65a90e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.nio.ByteBuffer;
+
+/**
+ * {@link SubClusterPolicyConfigurationDAO} is a class that represents a
+ * configuration of a policy. For a single queue, it contains a policy type
+ * (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
+ *
+ * Note: by design the params are an opaque ByteBuffer, this allows for enough
+ * flexibility to evolve the policies without impacting the protocols to/from
+ * the federation state store.
+ */
+@XmlRootElement(name = "subClusterPolicyConfigurationDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterPolicyConfigurationDAO {
+ private String queueName;
+ private String policyType;
+ private String policyParams;
+
+ public SubClusterPolicyConfigurationDAO() {
+ } // JAXB needs this
+
+ public SubClusterPolicyConfigurationDAO(SubClusterPolicyConfiguration policy) {
+ this.queueName = policy.getQueue();
+ this.policyType = policy.getType();
+ this.policyParams = Base64.encodeBase64String(policy.getParams().array());
+ }
+
+ public SubClusterPolicyConfiguration toSubClusterPolicyConfiguration() {
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+ ByteBuffer.wrap(Base64.decodeBase64(policyParams)));
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getPolicyType() {
+ return policyType;
+ }
+
+ public String getPolicyParams() {
+ return policyParams;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java
new file mode 100644
index 00000000000..ec76d371922
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ *
+ * State of a SubCluster.
+ *
+ */
+@XmlRootElement(name = "subClusterStateDAO")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterStateDAO {
+
+ protected enum SCState {
+ SC_NEW,
+
+ /** Subcluster is registered and the RM sent a heartbeat recently. */
+ SC_RUNNING,
+
+ /** Subcluster is unhealthy. */
+ SC_UNHEALTHY,
+
+ /** Subcluster is in the process of being out of service. */
+ SC_DECOMMISSIONING,
+
+ /** Subcluster is out of service. */
+ SC_DECOMMISSIONED,
+
+ /** RM has not sent a heartbeat for some configured time threshold. */
+ SC_LOST,
+
+ /** Subcluster has unregistered. */
+ SC_UNREGISTERED;
+ };
+
+ protected SCState scState;
+
+ public SubClusterStateDAO() {
+ } // JAXB needs this
+
+ public SubClusterStateDAO(SubClusterState state) {
+ scState = SCState.valueOf(state.toString());
+ }
+
+ public SubClusterState getSubClusterState() {
+ return SubClusterState.valueOf(scState.toString());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java
new file mode 100644
index 00000000000..9e64fd29118
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ *
+ * SubClustersInfoDAO is a class that represent a set of SubClusterInfo.
+ *
+ */
+@XmlRootElement(name = "subClustersInfoMap")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClustersInfoDAO {
+
+ private ArrayList subClusters =
+ new ArrayList();
+
+ public SubClustersInfoDAO() {
+ } // JAXB needs this
+
+ public SubClustersInfoDAO(Collection subClustersInfo) {
+ for(SubClusterInfo entry : subClustersInfo) {
+ subClusters.add(new SubClusterInfoDAO(entry));
+ }
+ }
+
+ public ArrayList getSubClusters() {
+ return subClusters;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java
new file mode 100644
index 00000000000..dc7df95716e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Federation StateStore DAO package.*/
+package org.apache.hadoop.yarn.server.federation.store.records.dao;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 60b794f36d1..448da9fc6f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -52,8 +52,11 @@
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
@@ -62,9 +65,14 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -220,6 +228,33 @@ public static FederationStateStoreFacade getInstance() {
return FACADE;
}
+ /**
+ * Register a subcluster identified by {@code SubClusterId} to
+ * indicate participation in federation
+ *
+ * @param info the SubClusterInfo for the SubCluster that is participating
+ * in federation
+ */
+ public void registerSubCluster(SubClusterInfo info) throws YarnException {
+ stateStore.registerSubCluster(SubClusterRegisterRequest.newInstance(info));
+ }
+
+ /**
+ * Deregister a subcluster identified by {@code SubClusterId} to
+ * change state in federation. This can be done to mark the sub cluster lost,
+ * deregistered, or decommissioned.
+ *
+ * @param subClusterId the target subclusterId
+ * @param subClusterState the state to update it to
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deregisterSubCluster(SubClusterId subClusterId,
+ SubClusterState subClusterState) throws YarnException {
+ stateStore.deregisterSubCluster(
+ SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState));
+ return;
+ }
+
/**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
@@ -335,6 +370,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(
}
}
+ /**
+ * Set a policy configuration into the state store.
+ *
+ * @param policyConf the policy configuration to set
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
+ throws YarnException {
+ stateStore.setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
+ }
+
/**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
@@ -382,6 +429,36 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
+ /**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param applicationId the application to delete the home sub-cluster of
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+ throws YarnException {
+ stateStore.deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+ return;
+ }
+
+ /**
+ * Returns the home {@link SubClusterId} for the specified
+ * {@link ApplicationId}.
+ *
+ * @return the home sub cluster identifier
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public List getApplicationsHomeSubCluster()
+ throws YarnException {
+ GetApplicationsHomeSubClusterResponse response =
+ stateStore.getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance());
+ return response.getAppsHomeSubClusters();
+ }
+
/**
* Get the singleton instance of SubClusterResolver.
*
@@ -400,6 +477,20 @@ public Configuration getConf() {
return this.conf;
}
+ /**
+ * Periodic heartbeat from a ResourceManager to indicate
+ * liveliness.
+ *
+ * @param subClusterId the id of the sub cluster heart beating
+ * @param state the state of the sub cluster
+ * @param capability the capability of the sub cluster
+ */
+ public void subClusterHeartBeat(SubClusterId subClusterId,
+ SubClusterState state, String capability) throws YarnException {
+ stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
+ .newInstance(subClusterId, state, capability));
+ }
+
/**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
new file mode 100644
index 00000000000..d3f7d383760
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ **/
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+ implements StateStoreWebServiceProtocol{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+ private final Router router;
+ private final Configuration conf;
+ private @Context HttpServletResponse response;
+
+ private Map userPipelineMap;
+
+ @Inject
+ public RouterStateStoreProxyWebServices(final Router router,
+ Configuration conf) {
+ this.router = router;
+ this.conf = conf;
+ int maxCacheSize =
+ conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+ YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+ this.userPipelineMap = Collections.synchronizedMap(
+ new LRUCacheHashMap(
+ maxCacheSize, true));
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param config
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List getInterceptorClassNames(Configuration config) {
+ String configuredInterceptorClassNames =
+ config.get(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS);
+
+ List interceptorClassNames = new ArrayList();
+ Collection tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ private void init() {
+ // clear content type
+ response.setContentType(null);
+ }
+
+ @POST
+ @Path(HttpProxyFederationStateStoreConsts.PATH_REGISTER)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+ throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().registerSubCluster(scInfoDAO);
+ }
+
+ @POST
+ @Path(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().deregisterSubCluster(deregisterDAO);
+ }
+
+ @POST
+ @Path(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().subClusterHeartBeat(heartbeatDAO);
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS_SCID)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getSubCluster(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_SCID)
+ String subClusterId) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getSubCluster(subClusterId);
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getSubClusters(
+ @QueryParam(HttpProxyFederationStateStoreConsts.QUERY_SC_FILTER)
+ @DefaultValue("true")
+ boolean filterInactiveSubClusters) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getSubClusters(
+ filterInactiveSubClusters);
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getPoliciesConfigurations()
+ throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getPoliciesConfigurations();
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY_QUEUE)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getPolicyConfiguration(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE)
+ String queue) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getPolicyConfiguration(queue);
+ }
+
+ @POST
+ @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response setPolicyConfiguration(
+ SubClusterPolicyConfigurationDAO policyConf) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().setPolicyConfiguration(policyConf);
+ }
+
+ @POST
+ @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response addApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().addApplicationHomeSubCluster(
+ appHomeDAO);
+ }
+
+ @PUT
+ @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().updateApplicationHomeSubCluster(
+ appHomeDAO);
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getApplicationHomeSubCluster(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID)
+ String appId) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getApplicationHomeSubCluster(appId);
+ }
+
+ @GET
+ @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response getApplicationsHomeSubCluster() throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().getApplicationsHomeSubCluster();
+ }
+
+ @DELETE
+ @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID)
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response deleteApplicationHomeSubCluster(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID)
+ String appId) throws YarnException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+ return pipeline.getRootInterceptor().deleteApplicationHomeSubCluster(appId);
+ }
+
+ @VisibleForTesting
+ protected RequestInterceptorChainWrapper getInterceptorChain(
+ HttpServletRequest hsr) {
+ String user = "";
+ if (hsr != null) {
+ user = hsr.getRemoteUser();
+ }
+ try {
+ if (user == null || user.equals("")) {
+ // Yarn Router user
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ }
+ } catch (IOException e) {
+ LOG.error("IOException " + e.getMessage());
+ }
+ RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
+ if (chain != null && chain.getRootInterceptor() != null) {
+ return chain;
+ }
+ return initializePipeline(user);
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the users.
+ *
+ * @return the request intercepter chains.
+ */
+ @VisibleForTesting
+ protected Map getPipelines() {
+ return this.userPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ @VisibleForTesting
+ protected StateStoreProxyInterceptor createRequestInterceptorChain() {
+
+ List interceptorClassNames = getInterceptorClassNames(conf);
+
+ StateStoreProxyInterceptor pipeline = null;
+ StateStoreProxyInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class> interceptorClass = conf.getClassByName(interceptorClassName);
+ if (StateStoreProxyInterceptor.class.isAssignableFrom(interceptorClass)) {
+ StateStoreProxyInterceptor interceptorInstance =
+ (StateStoreProxyInterceptor) ReflectionUtils
+ .newInstance(interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + interceptorClassName + " not instance of "
+ + StateStoreProxyInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate RESTRequestInterceptor: "
+ + interceptorClassName,
+ e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified user.
+ *
+ * @param user
+ */
+ private RequestInterceptorChainWrapper initializePipeline(String user) {
+ synchronized (this.userPipelineMap) {
+ if (this.userPipelineMap.containsKey(user)) {
+ LOG.info("Request to start an already existing user: {}"
+ + " was received, so ignoring.", user);
+ return userPipelineMap.get(user);
+ }
+
+ RequestInterceptorChainWrapper chainWrapper =
+ new RequestInterceptorChainWrapper();
+ try {
+ // We should init the pipeline instance after it is created and then
+ // add to the map, to ensure thread safe.
+ LOG.info("Initializing request processing pipeline for user: {}", user);
+ StateStoreProxyInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(user);
+ chainWrapper.init(interceptorChain);
+ } catch (Exception e) {
+ LOG.error("Init StateStoreProxyInterceptor error for user: " + user, e);
+ throw e;
+ }
+
+ this.userPipelineMap.put(user, chainWrapper);
+ return chainWrapper;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and user instances.
+ *
+ */
+ @Private
+ public static class RequestInterceptorChainWrapper {
+ private StateStoreProxyInterceptor rootInterceptor;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param interceptor the first interceptor in the pipeline
+ */
+ public synchronized void init(StateStoreProxyInterceptor interceptor) {
+ this.rootInterceptor = interceptor;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized StateStoreProxyInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Shutdown the chain of interceptors when the object is destroyed.
+ */
+ @Override
+ protected void finalize() {
+ rootInterceptor.shutdown();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
new file mode 100644
index 00000000000..966ae138050
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor implements
+ StateStoreProxyInterceptor {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+ private FederationStateStoreFacade stateStoreFacade;
+ private @Context Configuration conf;
+
+ @Override
+ public void init(String user) {
+ this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor) {
+ throw new YarnRuntimeException(this.getClass().getName()
+ + " should be the last interceptor");
+ }
+
+ @Override
+ public StateStoreProxyInterceptor getNextInterceptor() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+ throws YarnException {
+ LOG.info("Registering SubCluster {}", scInfoDAO.getSubClusterId());
+ try {
+ stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+ } catch (YarnException e) {
+ LOG.error("Could not register SubCluster {}", scInfoDAO.getSubClusterId(), e);
+ throw e;
+ }
+ LOG.info("Registered SubCluster {}", scInfoDAO.getSubClusterId());
+ return Response.status(Response.Status.OK).build();
+ }
+
+ @Override
+ public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException {
+ SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+ SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+ LOG.info("Deregistering SubCluster {}", scIdDAO.getSubClusterId());
+ try {
+ stateStoreFacade
+ .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.getSubClusterId()),
+ scStateDAO.getSubClusterState());
+ } catch (YarnException e) {
+ LOG.error("Could not deregister SubCluster {}", scIdDAO.getSubClusterId(), e);
+ throw e;
+ }
+ LOG.info("Deregistered SubCluster {}", scIdDAO.getSubClusterId());
+ return Response.status(Response.Status.OK).build();
+ }
+
+ @Override
+ public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException {
+ SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+ SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+ LOG.info("Heartbeating for SubCluster {}", scIdDAO.getSubClusterId());
+ try {
+ stateStoreFacade
+ .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.getSubClusterId()),
+ scStateDAO.getSubClusterState(), heartbeatDAO.getCapability());
+ } catch (YarnException e) {
+ LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.getSubClusterId(),
+ e);
+ throw e;
+ }
+ LOG.info("Heartbeat for SubCluster {}", scIdDAO.getSubClusterId());
+ return Response.status(Response.Status.OK).build();
+ }
+
+ @Override
+ public Response getSubCluster(
+ String subClusterId) throws YarnException {
+
+ if (subClusterId == null || subClusterId.isEmpty()) {
+ throw new NotFoundException("subClusterId is empty or null");
+ }
+
+ LOG.debug("Fetching subcluster info for subcluster: " + subClusterId);
+
+ SubClusterInfo resp =
+ stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId));
+
+ LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId
+ + ". Subcluster details:" + resp);
+
+ if(resp == null){
+ LOG.warn("Subcluster {} does not exist", subClusterId);
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ return Response.status(Response.Status.OK).entity(
+ new SubClusterInfoDAO(resp)).build();
+ }
+
+ @Override
+ public Response getSubClusters(
+ boolean filterInactiveSubClusters) throws YarnException {
+ LOG.debug("Fetching info for all subclusters. filterInactiveSubClusters="
+ + filterInactiveSubClusters);
+
+ Map resp =
+ stateStoreFacade.getSubClusters(filterInactiveSubClusters);
+
+ LOG.info(
+ "Retrieved subcluster info for all subclusters. Subcluster count is="
+ + resp.size());
+
+ return Response.status(Response.Status.OK).entity(
+ new SubClustersInfoDAO(resp.values())).build();
+ }
+
+ @Override
+ public Response getPoliciesConfigurations()
+ throws YarnException {
+ LOG.debug("Fetching policy info for all queues");
+
+ Map resp =
+ stateStoreFacade.getPoliciesConfigurations();
+
+ LOG.debug(
+ "Retrieved policy info for all queues. Policy count is=" + resp.size());
+
+ return Response.status(Response.Status.OK).entity(
+ new SubClusterPoliciesConfigurationsDAO(resp.values())).build();
+ }
+
+ @Override
+ public Response getPolicyConfiguration(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE)
+ String queue) throws YarnException {
+
+ if (queue == null || queue.isEmpty()) {
+ throw new NotFoundException("queue name is empty or null");
+ }
+
+ LOG.debug("Fetching policy info for queue: " + queue);
+
+ SubClusterPolicyConfiguration resp =
+ stateStoreFacade.getPolicyConfiguration(queue);
+
+ if(resp == null){
+ LOG.warn("Policy for queue {} does not exist", queue);
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ LOG.debug("Retrieved policy info for queue: " + queue + ". Policy details:"
+ + resp);
+
+ return Response.status(Response.Status.OK).entity(
+ new SubClusterPolicyConfigurationDAO(resp)).build();
+ }
+
+ @Override
+ public Response setPolicyConfiguration(
+ SubClusterPolicyConfigurationDAO policyConf) throws YarnException {
+
+
+ String queue = policyConf.getQueueName();
+
+ if (queue == null || queue.isEmpty()) {
+ throw new NotFoundException("queue name is empty or null");
+ }
+ LOG.info("Setting policy info for queue: " + queue);
+
+ try {
+ stateStoreFacade.setPolicyConfiguration(
+ policyConf.toSubClusterPolicyConfiguration());
+ } catch (YarnException e) {
+ LOG.error("Could not set policy ", e);
+ throw e;
+ }
+
+ LOG.info("Set policy info for queue: " + queue + ". Policy details:");
+ return Response.status(Response.Status.OK).build();
+ }
+
+ @Override
+ public Response addApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ LOG.info("Adding home SubCluster for application {} as {}",
+ appHomeDAO.getAppId(), appHomeDAO.getSubClusterId().getSubClusterId());
+ SubClusterId ret;
+ try {
+ ret = stateStoreFacade.addApplicationHomeSubCluster(
+ appHomeDAO.toApplicationHomeSubCluster());
+ } catch (YarnException e) {
+ LOG.error("Could not add home SubCluster ", e);
+ throw e;
+ }
+ LOG.info("Added home SubCluster for application {} as {}",
+ appHomeDAO.getAppId(), appHomeDAO.getSubClusterId().getSubClusterId());
+ return Response.status(Response.Status.OK).entity(
+ new SubClusterIdDAO(ret)).build();
+ }
+
+ @Override
+ public Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ LOG.info("Updating home SubCluster for application {} to {}",
+ appHomeDAO.getAppId(), appHomeDAO.getSubClusterId());
+ SubClusterId ret;
+ try {
+ stateStoreFacade.updateApplicationHomeSubCluster(
+ appHomeDAO.toApplicationHomeSubCluster());
+ } catch (YarnException e) {
+ LOG.error("Could not update home SubCluster ", e);
+ throw e;
+ }
+ LOG.info("Updating home SubCluster for application {} as {}",
+ appHomeDAO.getAppId(), appHomeDAO.getSubClusterId());
+ return Response.status(Response.Status.OK).build();
+ }
+
+ @Override
+ public Response getApplicationHomeSubCluster(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID)
+ String appId) throws YarnException {
+ LOG.debug("Getting home SubCluster for application {}", appId);
+ SubClusterId resp;
+ try {
+ resp = stateStoreFacade.getApplicationHomeSubCluster(Apps.toAppID(appId));
+ } catch (YarnException e) {
+ LOG.error("Could not get home SubCluster ", e);
+ throw e;
+ }
+
+ if(resp == null){
+ LOG.warn("Home subcluster for application {} does not exist", appId);
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ LOG.info("Retrieved home SubCluster for application {}", appId);
+ return Response.status(Response.Status.OK).entity(
+ new SubClusterIdDAO(resp)).build();
+ }
+
+ @Override
+ public Response getApplicationsHomeSubCluster() throws YarnException {
+ LOG.debug("Getting home SubCluster for applications");
+ List resp;
+ try {
+ resp = stateStoreFacade.getApplicationsHomeSubCluster();
+ } catch (YarnException e) {
+ LOG.error("Could not get home SubClusters ", e);
+ throw e;
+ }
+
+ LOG.info("Retrieved home SubCluster for applications {}", resp);
+ return Response.status(Response.Status.OK).entity(
+ new ApplicationsHomeSubClusterDAO(resp)).build();
+ }
+
+ @Override
+ public Response deleteApplicationHomeSubCluster(
+ @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID)
+ String appId) throws YarnException {
+ LOG.info("Deleting home SubCluster for application {}", appId);
+ try {
+ stateStoreFacade.deleteApplicationHomeSubCluster(Apps.toAppID(appId));
+ } catch (YarnException e) {
+ LOG.error("Could not delete home SubCluster ", e);
+ throw e;
+ }
+ LOG.info("Deleted home SubCluster for application {}", appId);
+ return Response.status(Response.Status.OK).build();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java
new file mode 100644
index 00000000000..21b779c1f2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager server.
+ *
+ * This class includes all methods that the StateStore implements but in
+ * REST format.
+ */
+public interface StateStoreProxyInterceptor extends Configurable,
+ StateStoreWebServiceProtocol {
+
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param user the name of the client
+ */
+ void init(String user);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor the RESTRequestInterceptor to set in the pipeline
+ */
+ void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ StateStoreProxyInterceptor getNextInterceptor();
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java
new file mode 100644
index 00000000000..85aea80d128
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply rejects all the client requests.
+ */
+public class StateStoreProxyRejectInterceptor implements
+ StateStoreProxyInterceptor {
+
+ private FederationStateStoreFacade stateStoreFacade;
+ private @Context Configuration conf;
+
+ @Override
+ public void init(String user) {
+ this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor) {
+ throw new YarnRuntimeException(
+ this.getClass().getName() + " should be the last interceptor");
+ }
+
+ @Override
+ public StateStoreProxyInterceptor getNextInterceptor() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getSubCluster(String subClusterId) throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getSubClusters(boolean filterInactiveSubclusters)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getPoliciesConfigurations() throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getPolicyConfiguration(String queue) throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response setPolicyConfiguration(
+ SubClusterPolicyConfigurationDAO policyConf) throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response addApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getApplicationHomeSubCluster(String appId)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response getApplicationsHomeSubCluster() throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+
+ @Override
+ public Response deleteApplicationHomeSubCluster(String appId)
+ throws YarnException {
+ return Response.status(501).entity("Request reached " +
+ StateStoreProxyRejectInterceptor.class.getName()).build();
+ }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java
new file mode 100644
index 00000000000..b0628eee0b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+
+import javax.ws.rs.core.Response;
+
+/**
+ * The protocol between clients and the FederationStateStore
+ * over REST APIs
+ */
+
+public interface StateStoreWebServiceProtocol {
+
+ /**
+ * Register a subcluster by publishing capabilities as represented by
+ * {@code SubClusterInfo} to indicate participation in federation. This is
+ * typically done during initialization or restart/failover of the
+ * subcluster's ResourceManager. Upon successful registration, an
+ * identifier for the subcluster which is unique across the federated
+ * cluster is returned. The identifier is static, i.e. preserved across
+ * restarts and failover.
+ *
+ * @param scInfoDAO the capabilities of the subcluster that
+ * wants to participate in federation. The subcluster id is also
+ * specified in case registration is triggered by restart/failover
+ * @return response empty on successfully if registration was successful
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response registerSubCluster(SubClusterInfoDAO scInfoDAO) throws YarnException;
+
+ /**
+ * Deregister a subcluster identified by {@code SubClusterId} to
+ * change state in federation. This can be done to mark the sub cluster lost,
+ * deregistered, or decommissioned.
+ *
+ * @param deregisterDAO - the request to deregister the
+ * sub-cluster from federation.
+ * @return response empty on successfully deregistering the subcluster state
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO)
+ throws YarnException;
+
+ /**
+ * Periodic heartbeat from a ResourceManager participating in
+ * federation to indicate liveliness. The heartbeat publishes the current
+ * capabilities as represented by {@code SubClusterInfo} of the subcluster.
+ * Currently response is empty if the operation was successful, if not an
+ * exception reporting reason for a failure.
+ *
+ * @param heartbeatDAO the capabilities of the subcluster that
+ * wants to keep alive its participation in federation
+ * @return response currently empty on if heartbeat was successfully processed
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO)
+ throws YarnException;
+
+ /**
+ * Get the membership information of subcluster as identified by
+ * {@code SubClusterId}. The membership information includes the cluster
+ * endpoint and current capabilities as represented by {@code SubClusterInfo}.
+ *
+ * @param subClusterId the subcluster whose information is required
+ * @return the {@code SubClusterInfo}, or {@code null} if there is no mapping
+ * for the subcluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getSubCluster(String subClusterId) throws YarnException;
+
+ /**
+ * Get the membership information of all the subclusters that are
+ * currently participating in federation. The membership information includes
+ * the cluster endpoint and current capabilities as represented by
+ * {@code SubClusterInfo}.
+ *
+ * @param filterInactiveSubclusters whether to filter inactive sub clusters
+ * @return a map of {@code SubClusterInfo} keyed by the {@code SubClusterId}
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getSubClusters(boolean filterInactiveSubclusters)
+ throws YarnException;
+
+ /**
+ * Get a map of all queue-to-policy configurations.
+ *
+ * @return the policies for all currently active queues in the system
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getPoliciesConfigurations() throws YarnException;
+
+ /**
+ * Get the policy configuration for a given queue.
+ *
+ * @param queue the queue whose {@code SubClusterPolicyConfiguration} is
+ * required
+ * @return the {@code SubClusterPolicyConfiguration} for the specified queue,
+ * or {@code null} if there is no mapping for the queue
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getPolicyConfiguration(String queue) throws YarnException;
+ /**
+ * Set the policy configuration for a given queue.
+ *
+ * @param policyConf the {@code SubClusterPolicyConfiguration} with the
+ * corresponding queue
+ * @return response empty on successfully updating the
+ * {@code SubClusterPolicyConfiguration} for the specified queue
+ * @throws YarnException if the request is invalid/fails
+ */
+
+ Response setPolicyConfiguration(SubClusterPolicyConfigurationDAO policyConf)
+ throws YarnException;
+
+ /**
+ * Register the home {@code SubClusterId} of the newly submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure. If a
+ * mapping for the application already existed, the {@code SubClusterId} in
+ * this response will return the existing mapping which might be different
+ * from that in the {@code AddApplicationHomeSubClusterRequest}.
+ *
+ * @param appHomeDAO the request to register a new application with its home
+ * sub-cluster
+ * @return upon successful registration of the application in the StateStore,
+ * {@code AddApplicationHomeSubClusterRequest} containing the home
+ * sub-cluster of the application. Otherwise, an exception reporting
+ * reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response addApplicationHomeSubCluster(ApplicationHomeSubClusterDAO appHomeDAO)
+ throws YarnException;
+
+ /**
+ * Update the home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param appHomeDAO the request to update the home sub-cluster of an
+ * application.
+ * @return empty on successful update of the application in the StateStore, if
+ * not an exception reporting reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response updateApplicationHomeSubCluster(
+ ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException;
+
+ /**
+ * Get information about the application identified by the input
+ * {@code ApplicationId}.
+ *
+ * @param appId the application to query
+ * @return {@code ApplicationHomeSubCluster} containing the application's home
+ * subcluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getApplicationHomeSubCluster(String appId) throws YarnException;
+
+ /**
+ * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+ * all submitted applications to it's home sub-cluster.
+ *
+ * @return the mapping of all submitted application to it's home sub-cluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response getApplicationsHomeSubCluster() throws YarnException;
+
+ /**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation was
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param appId the ID of the application to delete
+ * @return empty on successful update of the application in the StateStore, if
+ * not an exception reporting reason for a failure
+ * @throws YarnException if the request is invalid/fails
+ */
+ Response deleteApplicationHomeSubCluster(String appId) throws YarnException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java
new file mode 100644
index 00000000000..ecf4905c30e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router Federation StateStore Proxy package. **/
+package org.apache.hadoop.yarn.server.router.ssproxy;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
index ba07a1afba4..e8af9e4bb85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.router.ssproxy.RouterStateStoreProxyWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
@@ -38,6 +39,7 @@ public RouterWebApp(Router router) {
public void setup() {
bind(JAXBContextResolver.class);
bind(RouterWebServices.class);
+ bind(RouterStateStoreProxyWebServices.class);
bind(GenericExceptionHandler.class);
bind(RouterWebApp.class).toInstance(this);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java
new file mode 100644
index 00000000000..445e6f048dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java
@@ -0,0 +1,80 @@
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.FederationStateStoreBaseTest;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStore;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ * Validate the correctness of the StateStoreProxy in the router.
+ * This test case works by setting the facade to use a MemoryStateStore,
+ * and then testing the HttpProxyFederationStateStore, which communicates
+ * via REST with the StateStoreProxyWebServices
+ */
+public class TestRouterStateStoreProxy extends
+ FederationStateStoreBaseTest{
+
+ private static Router router;
+
+ private static FederationStateStore proxyStore;
+
+ public TestRouterStateStoreProxy(){
+ }
+
+ @BeforeClass
+ public static void setUpProxy(){
+ // Set up the router proxy to use a memory state store
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
+ MemoryFederationStateStore.class.getName());
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ conf.set(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+ StateStoreProxyDefaultInterceptor.class.getName());
+ proxyStore = new MemoryFederationStateStore();
+ FederationStateStoreFacade.getInstance().reinitialize(proxyStore, conf);
+ router = new Router();
+ router.init(conf);
+ router.start();
+ }
+
+ @AfterClass
+ public static void tearDownProxy(){
+ router.stop();
+ }
+
+ @Before
+ @Override
+ public void before() throws IOException, YarnException {
+ super.before();
+ proxyStore.init(getConf());
+ }
+
+ @After
+ @Override
+ public void after() throws Exception{
+ super.after();
+ proxyStore.close();
+ }
+
+ @Override
+ protected FederationStateStore createStateStore(){
+ Configuration conf = new Configuration();
+
+ // Set up our local state store
+ super.setConf(conf);
+ return new HttpProxyFederationStateStore();
+ }
+
+}
+