diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9a8e48ea4af..7c389fba6e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3427,6 +3427,37 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; + public static final String ROUTER_SSPROXY_PREFIX = ROUTER_PREFIX + "ssproxy."; + + public static final String ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_SSPROXY_PREFIX + "interceptor-class.pipeline"; + + public static final String DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS = + "org.apache.hadoop.yarn.server.router.ssproxy." + + "StateStoreProxyDefaultInterceptor"; + + public static final String FEDERATION_STATESTORE_HTTP_PROXY_PREFIX = + FEDERATION_PREFIX + "state-store.http-proxy"; + + public static final String FEDERATION_STATESTORE_HTTP_PROXY_URL = + FEDERATION_STATESTORE_HTTP_PROXY_PREFIX + "url"; + + public static final String DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_URL = + DEFAULT_ROUTER_WEBAPP_ADDRESS; + + public static final String + FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS = + FEDERATION_STATESTORE_HTTP_PROXY_PREFIX + "connect.timeout-ms"; + + public static final int + DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS = 60000; + + public static final String FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS = + FEDERATION_STATESTORE_HTTP_PROXY_PREFIX + "read.timeout-ms"; + + public static final int + DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS = 60000; + public static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 6f781fa7c0d..655495b30ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -128,6 +128,14 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS); + // Federation StateStore HTTP implementation configs to be ignored + configurationPrefixToSkipCompare + .add(YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_URL); + configurationPrefixToSkipCompare.add( + YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS); + configurationPrefixToSkipCompare.add( + YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS); + // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" configurationPropsToSkipCompare.add(YarnConfiguration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7ec1d9a6595..b68155537dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3605,6 +3605,19 @@ + + + The comma separated list of class names that implement the + RequestInterceptor interface. This is used by the + RouterStateStoreProxyWebServices + to create the request processing pipeline for users. + + yarn.router.ssproxy.interceptor-class.pipeline + + org.apache.hadoop.yarn.server.router.ssproxy.StateStoreProxyDefaultInterceptor + + + Comma-separated list of PlacementRules to determine how applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java new file mode 100644 index 00000000000..ae39fa14e62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStore.java @@ -0,0 +1,602 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.impl; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.StringKeyIgnoreCaseMultivaluedMap; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.apache.hadoop.yarn.webapp.RemoteExceptionData; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.xml.ws.http.HTTPException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * REST implementation of {@link FederationStateStore}. + */ +public class HttpProxyFederationStateStore implements FederationStateStore { + + public static final Logger LOG = + LoggerFactory.getLogger(HttpProxyFederationStateStore.class); + + private String webAppUrl; + private String webAppResourceRootPath; + private final Clock clock = new MonotonicClock(); + private Map subClusters; + + // It is very expensive to create the client + // Jersey will spawn a thread for every client request + private Client client = null; + + public final static String FEDERATION_DEFAULT_SUB_CLUSTERS = + "yarn.federation.default.subclusters"; + + public final static String FEDERATION_STATE_STORE_FALLBACK_ENABLED = + "yarn.federation.statestore.fallback.enabled"; + public final static boolean DEFAULT_FEDERATION_STATE_STORE_TIMEOUT_ENABLED = + false; + + private boolean fallbackToConfig; + + @Override + public void init(Configuration conf) throws YarnException { + this.webAppUrl = WebAppUtils.HTTP_PREFIX + conf + .get(YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_URL, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_URL); + + this.webAppResourceRootPath = HttpProxyFederationStateStoreConsts.ROOT; + this.client = createJerseyClient(conf); + this.subClusters = new ConcurrentHashMap<>(); + this.fallbackToConfig = + conf.getBoolean(FEDERATION_STATE_STORE_FALLBACK_ENABLED, + DEFAULT_FEDERATION_STATE_STORE_TIMEOUT_ENABLED); + loadSubClustersFromConfig(conf); + } + + protected void loadSubClustersFromConfig(Configuration conf) + throws YarnException { + String str = conf.get(FEDERATION_DEFAULT_SUB_CLUSTERS); + if (str == null) { + throw new YarnException("Default subclusters not configured in " + + FEDERATION_DEFAULT_SUB_CLUSTERS); + } + Iterator iter = StringUtils.getStringCollection(str).iterator(); + while (iter.hasNext()) { + SubClusterId scId = SubClusterId.newInstance(iter.next().trim()); + String rmAddr = iter.next().trim(); + String amRmPort = iter.next().trim(); + String clientRmPort = iter.next().trim(); + String adminPort = iter.next().trim(); + String webPort = iter.next().trim(); + subClusters.put(scId, SubClusterInfo + .newInstance(scId, rmAddr + ":" + amRmPort, + rmAddr + ":" + clientRmPort, rmAddr + ":" + adminPort, + rmAddr + ":" + webPort, 0, SubClusterState.SC_RUNNING, 0, "")); + LOG.info("Loaded subcluster from config: {}", subClusters.get(scId)); + } + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest request) throws YarnException { + + // Input validator + FederationMembershipStateStoreInputValidator.validate(request); + + SubClusterInfoDAO scInfoDAO = + new SubClusterInfoDAO(request.getSubClusterInfo()); + + try { + invokePost(HttpProxyFederationStateStoreConsts.PATH_REGISTER, scInfoDAO, + null); + } catch (HTTPException e) { + LOG.error("registerSubCluster REST call failed ", e); + return null; + } + + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest request) throws YarnException { + // Input validator + FederationMembershipStateStoreInputValidator.validate(request); + + try { + invokePost(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER, + new SubClusterDeregisterDAO(request), null); + } catch (HTTPException e) { + LOG.error("deregisterSubCluster REST call failed ", e); + return null; + } + + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest request) throws YarnException { + // Input validator + FederationMembershipStateStoreInputValidator.validate(request); + + try { + invokePost(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT, + new SubClusterHeartbeatDAO(request), null); + } catch (HTTPException e) { + LOG.error("subClusterHeartbeat REST call failed ", e); + return null; + } + + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest subClusterRequest) throws YarnException { + // Input validator + FederationMembershipStateStoreInputValidator.validate(subClusterRequest); + + SubClusterId scId = subClusterRequest.getSubClusterId(); + String scIdStr = scId.getId(); + SubClusterInfoDAO scInfoDao = null; + + long startTime = clock.getTime(); + + try { + scInfoDao = invokeGet( + HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS + "/" + scIdStr, + null, SubClusterInfoDAO.class); + FederationStateStoreClientMetrics + .succeededStateStoreCall(clock.getTime() - startTime); + } catch (HTTPException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the SubCluster information for " + scIdStr, e); + } + + if (scInfoDao == null) { + LOG.warn("The queried SubCluster: {} does not exist.", scIdStr); + return null; + } + + SubClusterInfo scInfo = scInfoDao.toSubClusterInfo(); + + try { + FederationMembershipStateStoreInputValidator.checkSubClusterInfo(scInfo); + } catch (FederationStateStoreInvalidInputException e) { + String errMsg = "SubCluster " + scIdStr + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return GetSubClusterInfoResponse.newInstance(scInfo); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest subClustersRequest) throws YarnException { + + boolean filterInactiveSubClusters = + subClustersRequest.getFilterInactiveSubClusters(); + + MultivaluedMap queryParams = + new StringKeyIgnoreCaseMultivaluedMap(); + queryParams.add(HttpProxyFederationStateStoreConsts.QUERY_SC_FILTER, + Boolean.toString(filterInactiveSubClusters)); + + SubClustersInfoDAO scsInfoDao = null; + + long startTime = clock.getTime(); + + try { + scsInfoDao = + invokeGet(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS, + queryParams, SubClustersInfoDAO.class); + FederationStateStoreClientMetrics + .succeededStateStoreCall(clock.getTime() - startTime); + } catch (Exception e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + if (fallbackToConfig) { + List subClustersInfo = new ArrayList<>(); + subClustersInfo.addAll(subClusters.values()); + return GetSubClustersInfoResponse.newInstance(subClustersInfo); + } + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the SubClusters ", e); + } + + List scInfoList = new ArrayList<>(); + + if (scsInfoDao != null) { + for (SubClusterInfoDAO scInfoDao : scsInfoDao.subClusters) { + SubClusterInfo scInfo = scInfoDao.toSubClusterInfo(); + try { + FederationMembershipStateStoreInputValidator + .checkSubClusterInfo(scInfo); + } catch (FederationStateStoreInvalidInputException e) { + String errMsg = "SubCluster " + scInfo.getSubClusterId().toString() + + " is not valid"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + scInfoList.add(scInfo); + } + } + + return GetSubClustersInfoResponse.newInstance(scInfoList); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + + SubClusterPoliciesConfigurationsDAO scPoliciesConfDao = null; + + long startTime = clock.getTime(); + + try { + scPoliciesConfDao = + invokeGet(HttpProxyFederationStateStoreConsts.PATH_POLICY, null, + SubClusterPoliciesConfigurationsDAO.class); + FederationStateStoreClientMetrics + .succeededStateStoreCall(clock.getTime() - startTime); + } catch (HTTPException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the policy information for all the queues.", e); + } + + List scPoliciesList = new ArrayList<>(); + + if (scPoliciesConfDao != null) { + for (SubClusterPolicyConfigurationDAO scPolicyDao : scPoliciesConfDao.policies) { + scPoliciesList.add(scPolicyDao.toSubClusterPolicyConfiguration()); + } + } + + return GetSubClusterPoliciesConfigurationsResponse + .newInstance(scPoliciesList); + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + // Input validator + FederationPolicyStoreInputValidator.validate(request); + + String queueName = request.getQueue(); + SubClusterPolicyConfigurationDAO scPolicyConfDao = null; + + long startTime = clock.getTime(); + + try { + scPolicyConfDao = invokeGet( + HttpProxyFederationStateStoreConsts.PATH_POLICY + "/" + queueName, + null, SubClusterPolicyConfigurationDAO.class); + FederationStateStoreClientMetrics + .succeededStateStoreCall(clock.getTime() - startTime); + } catch (HTTPException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to select the policy for the queue :" + queueName, e); + } + + if (scPolicyConfDao == null) { + LOG.warn("Policy for queue: {} does not exist.", queueName); + return null; + } + + SubClusterPolicyConfiguration scPolicy = + scPolicyConfDao.toSubClusterPolicyConfiguration(); + + return GetSubClusterPolicyConfigurationResponse.newInstance(scPolicy); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + // Input validator + FederationPolicyStoreInputValidator.validate(request); + + try { + invokePost(HttpProxyFederationStateStoreConsts.PATH_POLICY, + new SubClusterPolicyConfigurationDAO( + request.getPolicyConfiguration()), null); + } catch (HTTPException e) { + LOG.error("setPolicyConfiguration REST call failed ", e); + return null; + } + + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + SubClusterIdDAO scIdDAO; + + try { + scIdDAO = invokePost(HttpProxyFederationStateStoreConsts.PATH_APP_HOME, + new ApplicationHomeSubClusterDAO(request), SubClusterIdDAO.class); + } catch (HTTPException e) { + LOG.error("addApplicationHomeSubCluster REST call failed ", e); + return null; + } + + return AddApplicationHomeSubClusterResponse + .newInstance(scIdDAO.toSubClusterId()); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + try { + invokePut(HttpProxyFederationStateStoreConsts.PATH_APP_HOME, + new ApplicationHomeSubClusterDAO(request)); + } catch (HTTPException e) { + LOG.error("updateApplicationHomeSubCluster REST call failed ", e); + return null; + } + + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + String appId = request.getApplicationId().toString(); + + SubClusterIdDAO scIdDAO; + try { + scIdDAO = invokeGet( + HttpProxyFederationStateStoreConsts.PATH_APP_HOME + "/" + appId, null, + SubClusterIdDAO.class); + } catch (HTTPException e) { + LOG.error("getApplicationHomeSubCluster REST call failed ", e); + return null; + } + + if (scIdDAO == null) { + return null; + } + + return GetApplicationHomeSubClusterResponse.newInstance( + ApplicationHomeSubCluster + .newInstance(request.getApplicationId(), scIdDAO.toSubClusterId())); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + ApplicationsHomeSubClusterDAO ret; + try { + ret = invokeGet(HttpProxyFederationStateStoreConsts.PATH_APP_HOME, null, + ApplicationsHomeSubClusterDAO.class); + } catch (HTTPException e) { + LOG.error("getApplicationsHomeSubCluster REST call failed ", e); + return null; + } + + return GetApplicationsHomeSubClusterResponse + .newInstance(ret.toApplicationsHome()); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + + String appId = request.getApplicationId().toString(); + + try { + invokeDelete( + HttpProxyFederationStateStoreConsts.PATH_APP_HOME + "/" + appId); + } catch (HTTPException e) { + LOG.error("deleteApplicationHomeSubCluster REST call failed ", e); + return null; + } + + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public void close() throws Exception { + } + + @Override + public Version getCurrentVersion() { + throw new NotImplementedException("Code not implemented."); + } + + @Override + public Version loadVersion() { + throw new NotImplementedException("Code not implemented."); + } + + private T invokeGet(String path, + MultivaluedMap queryParams, final Class responseClass) + throws YarnException { + return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.GET, path, + queryParams, null, responseClass); + } + + private T invokeDelete(String path) throws YarnException { + return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.DELETE, path, + null, null, null); + } + + private T invokePost(String path, Object body, + final Class responseClass) throws YarnException { + return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.POST, path, + null, body, responseClass); + } + + private T invokePut(String path, Object body) throws YarnException { + return invokeFederationStateStoreProxy(webAppUrl, WebApp.HTTP.PUT, path, + null, body, null); + } + + private T invokeFederationStateStoreProxy(String webApp, + WebApp.HTTP method, String path, + MultivaluedMap queryParams, Object body, + final Class responseClass) throws YarnException { + T obj = null; + + WebResource webResource = client.resource(webApp); + + if (queryParams != null && queryParams.size() > 0) { + webResource = webResource.queryParams(queryParams); + } + + webResource = webResource.path(webAppResourceRootPath).path(path); + + WebResource.Builder builder = webResource.accept(MediaType.APPLICATION_XML); + + LOG.debug("Invoking HTTP REST request with method " + method.name() + + " to resource: " + builder.toString()); + ClientResponse response = null; + try { + switch (method) { + case GET: + response = builder.get(ClientResponse.class); + break; + case PUT: + response = builder.put(ClientResponse.class, body); + break; + case POST: + response = builder.post(ClientResponse.class, body); + break; + case DELETE: + response = builder.delete(ClientResponse.class); + break; + default: + throw new YarnRuntimeException( + "Unknown HTTP method type " + method.name()); + } + + if (response.getStatus() == 200) { + if (responseClass != null) { + obj = response.getEntity(responseClass); + } + } else if (response.getStatus() == 404) { + return null; + } else { + throw new FederationStateStoreException( + response.getEntity(RemoteExceptionData.class).getMessage()); + } + return obj; + } finally { + if (response != null) { + response.close(); + } + if (client != null) { + client.destroy(); + } + } + } + + /** + * Create a Jersey client instance. + */ + private Client createJerseyClient(Configuration conf) { + Client client = Client.create(); + client.setConnectTimeout(conf.getInt( + YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS)); + client.setReadTimeout(conf.getInt( + YarnConfiguration.FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS)); + return client; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java new file mode 100644 index 00000000000..ee66599d0ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/HttpProxyFederationStateStoreConsts.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +/** + * Constants for {@code StateStoreWebServiceProtocol}. + */ +public class HttpProxyFederationStateStoreConsts { + // Query Parameters + public static final String QUERY_SC_FILTER = "filterInactiveSubClusters"; + + // URL Parameters + public static final String PARAM_SCID = "subcluster"; + public static final String PARAM_APPID = "appid"; + public static final String PARAM_QUEUE = "queue"; + + // Paths + public static final String ROOT = "/ws/v1/statestore"; + + public static final String PATH_SUBCLUSTERS = "/subclusters"; + public static final String PATH_SUBCLUSTERS_SCID = + "/subclusters/{" + PARAM_SCID + "}"; + + public static final String PATH_POLICY = "/policy"; + public static final String PATH_POLICY_QUEUE = + "/policy/{" + PARAM_QUEUE + "}"; + + public static final String PATH_VIP_HEARTBEAT = "/heartbeat"; + + public static final String PATH_APP_HOME = "/apphome"; + public static final String PATH_APP_HOME_APPID = + "/apphome/{" + PARAM_APPID + "}"; + + public static final String PATH_REGISTER = "/subcluster/register"; + public static final String PATH_DEREGISTER = "/subcluster/deregister"; + public static final String PATH_HEARTBEAT = "/subcluster/heartbeat"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java new file mode 100644 index 00000000000..d5e671c8e29 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationHomeSubClusterDAO.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.util.Apps; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * ApplicationHomeSubCluster is a report of the runtime information of the + * application that is running in the federated cluster. + *

+ * It includes information such as: + * + */ +@XmlRootElement(name = "applicationHomeSubClusterDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class ApplicationHomeSubClusterDAO { + protected String appId; + protected SubClusterIdDAO subClusterId; + + public ApplicationHomeSubClusterDAO(){ + } // JAXB needs this + + public ApplicationHomeSubClusterDAO(ApplicationHomeSubCluster appHome){ + appId = appHome.getApplicationId().toString(); + subClusterId = new SubClusterIdDAO(appHome.getHomeSubCluster()); + } + + public ApplicationHomeSubClusterDAO( + AddApplicationHomeSubClusterRequest request){ + this(request.getApplicationHomeSubCluster()); + } + + public ApplicationHomeSubClusterDAO( + UpdateApplicationHomeSubClusterRequest request){ + this(request.getApplicationHomeSubCluster()); + } + + public ApplicationHomeSubCluster toApplicationHomeSubCluster(){ + return ApplicationHomeSubCluster.newInstance(Apps.toAppID(appId), + subClusterId.toSubClusterId()); + } + + public String getAppId(){ + return appId; + } + + public SubClusterIdDAO getSubClusterId(){ + return subClusterId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java new file mode 100644 index 00000000000..dba6b690ad1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/ApplicationsHomeSubClusterDAO.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + *

+ * ApplicationsHomeSubClustersDao is a report of the runtime information of the + * applications that are running in the federated cluster. + *

+ */ +@XmlRootElement(name = "applicationsHomeSubClusterDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class ApplicationsHomeSubClusterDAO { + + protected List appHomes; + + public ApplicationsHomeSubClusterDAO(){ + } // JAXB needs this + + public ApplicationsHomeSubClusterDAO( + List appHomes) { + this.appHomes = new ArrayList<>(); + for (ApplicationHomeSubCluster appHome : appHomes) { + this.appHomes.add(new ApplicationHomeSubClusterDAO(appHome)); + } + } + + public List toApplicationsHome(){ + List ret = new ArrayList<>(); + for(ApplicationHomeSubClusterDAO appHome : appHomes){ + ret.add(appHome.toApplicationHomeSubCluster()); + } + return ret; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java new file mode 100644 index 00000000000..8ba8495cf5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterDeregisterDAO.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * The request sent to set the state of a subcluster to either + * SC_DECOMMISSIONED, SC_LOST, or SC_DEREGISTERED. + * + *

+ * The update includes details such as: + *
    + *
  • {@link SubClusterId}
  • + *
  • {@link SubClusterState}
  • + *
+ */ +@XmlRootElement(name = "deregisterDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterDeregisterDAO { + + protected SubClusterIdDAO scId; + protected SubClusterStateDAO scState; + + public SubClusterDeregisterDAO(){ + } // JAXB needs this + + public SubClusterDeregisterDAO(SubClusterId id, SubClusterState state){ + this.scId = new SubClusterIdDAO(id); + this.scState = new SubClusterStateDAO(state); + } + + public SubClusterDeregisterDAO(SubClusterDeregisterRequest request) { + this(request.getSubClusterId(), request.getState()); + } + + public SubClusterIdDAO getSubClusterId(){ + return scId; + } + + public SubClusterStateDAO getScState() { + return scState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java new file mode 100644 index 00000000000..c6d3f980c5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterHeartbeatDAO.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * SubClusterHeartbeatRequest is a report of the runtime information of the + * subcluster that is participating in federation. + *

+ * It includes information such as: + *
    + *
  • {@link SubClusterId}
  • + *
  • The URL of the subcluster
  • + *
  • The timestamp representing the last start time of the subCluster
  • + *
  • {@code FederationsubClusterState}
  • + *
  • The current capacity and utilization of the subCluster
  • + *
+ */ +@XmlRootElement(name = "heartbeatDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterHeartbeatDAO { + + protected SubClusterIdDAO scId; + protected SubClusterStateDAO scState; + protected String capability; + + public SubClusterHeartbeatDAO(){ + } // JAXB needs this + + public SubClusterHeartbeatDAO(SubClusterId id, SubClusterState state, + String capability) { + this.scId = new SubClusterIdDAO(id); + this.scState = new SubClusterStateDAO(state); + this.capability = capability; + } + + public SubClusterHeartbeatDAO(SubClusterHeartbeatRequest request){ + this(request.getSubClusterId(), request.getState(), + request.getCapability()); + } + + public SubClusterIdDAO getSubClusterId(){ + return scId; + } + + public SubClusterStateDAO getScState() { + return scState; + } + + public String getCapability(){ + return capability; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java new file mode 100644 index 00000000000..48b6f3dd346 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterIdDAO.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * SubClusterId represents the globally unique identifier for a + * subcluster that is participating in federation. + *

+ * + * The globally unique nature of the identifier is obtained from the + * FederationMembershipStateStore on initialization. + */ +@XmlRootElement(name = "subClustersIdDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterIdDAO { + + public String subClusterId; + + public SubClusterIdDAO() { + } // JAXB needs this + + public SubClusterIdDAO(SubClusterId scId) { + subClusterId = scId.getId(); + } + + public SubClusterId toSubClusterId(){ + return SubClusterId.newInstance(subClusterId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java new file mode 100644 index 00000000000..1646a318ec9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterInfoDAO.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + *

+ * SubClusterInfo is a report of the runtime information of the subcluster that + * is participating in federation. + *

+ * It includes information such as: + *
    + *
  • {@link SubClusterId}
  • + *
  • The URL of the subcluster
  • + *
  • The timestamp representing the last start time of the subCluster
  • + *
  • {@code FederationsubClusterState}
  • + *
  • The current capacity and utilization of the subCluster
  • + *
+ */ +@XmlRootElement(name = "subClusterInfoDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterInfoDAO { + + public String subClusterId; + public String amRMServiceAddress; + public String clientRMServiceAddress; + public String rmAdminServiceAddress; + public String rmWebServiceAddress; + public long lastHeartBeat; + public SubClusterStateDAO state; + public long lastStartTime; + public String capability; + + public SubClusterInfoDAO() { + } // JAXB needs this + + public SubClusterInfoDAO(SubClusterInfo scinfo) { + this.subClusterId = scinfo.getSubClusterId().toString(); + this.amRMServiceAddress = scinfo.getAMRMServiceAddress(); + this.clientRMServiceAddress = scinfo.getClientRMServiceAddress(); + this.rmAdminServiceAddress = scinfo.getRMAdminServiceAddress(); + this.rmWebServiceAddress = scinfo.getRMWebServiceAddress(); + this.lastHeartBeat = scinfo.getLastHeartBeat(); + this.state = new SubClusterStateDAO(scinfo.getState()); + this.lastStartTime = scinfo.getLastStartTime(); + this.capability = scinfo.getCapability(); + } + + public SubClusterInfo toSubClusterInfo() { + return SubClusterInfo.newInstance( + SubClusterId.newInstance(subClusterId), + amRMServiceAddress, + clientRMServiceAddress, + rmAdminServiceAddress, + rmWebServiceAddress, + this.lastHeartBeat, + state.getSubClusterState(), + lastStartTime, + capability); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java new file mode 100644 index 00000000000..dcbd34a1471 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPoliciesConfigurationsDAO.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * SubClusterPoliciesConfigurationsDAO is a class that represents a set of a + * policies. + */ +@XmlRootElement(name = "subClusterPoliciesConfigurationsDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterPoliciesConfigurationsDAO { + + public List policies = new ArrayList(); + + public SubClusterPoliciesConfigurationsDAO() { + } // JAXB needs this + + public SubClusterPoliciesConfigurationsDAO( + Collection policies) { + for (SubClusterPolicyConfiguration entry : policies) { + this.policies.add(new SubClusterPolicyConfigurationDAO(entry)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java new file mode 100644 index 00000000000..95fcc7a3721 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterPolicyConfigurationDAO.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.commons.net.util.Base64; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.nio.ByteBuffer; + +/** + * {@link SubClusterPolicyConfigurationDAO} is a class that represents a + * configuration of a policy. For a single queue, it contains a policy type + * (resolve to a class name) and its params as an opaque {@link ByteBuffer}. + * + * Note: by design the params are an opaque ByteBuffer, this allows for enough + * flexibility to evolve the policies without impacting the protocols to/from + * the federation state store. + */ +@XmlRootElement(name = "subClusterPolicyConfigurationDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterPolicyConfigurationDAO { + public String queueName; + public String policyType; + public String policyParams; + + public SubClusterPolicyConfigurationDAO() { + } // JAXB needs this + + public SubClusterPolicyConfigurationDAO( + SubClusterPolicyConfiguration policy) { + this.queueName = policy.getQueue(); + this.policyType = policy.getType(); + this.policyParams = Base64.encodeBase64String(policy.getParams().array()); + } + + public SubClusterPolicyConfiguration toSubClusterPolicyConfiguration() { + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, + ByteBuffer.wrap(Base64.decodeBase64(policyParams))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java new file mode 100644 index 00000000000..1b2cb3aedf4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClusterStateDAO.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * State of a SubCluster + */ +@XmlRootElement(name = "subClusterStateDAO") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterStateDAO { + + protected enum SCState { + SC_NEW, + + /** Subcluster is registered and the RM sent a heartbeat recently. */ + SC_RUNNING, + + /** Subcluster is unhealthy. */ + SC_UNHEALTHY, + + /** Subcluster is in the process of being out of service. */ + SC_DECOMMISSIONING, + + /** Subcluster is out of service. */ + SC_DECOMMISSIONED, + + /** RM has not sent a heartbeat for some configured time threshold. */ + SC_LOST, + + /** Subcluster has unregistered. */ + SC_UNREGISTERED; + }; + + protected SCState scState; + + public SubClusterStateDAO() { + } // JAXB needs this + + public SubClusterStateDAO(SubClusterState state) { + scState = SCState.valueOf(state.toString()); + } + + public SubClusterState getSubClusterState() { + return SubClusterState.valueOf(scState.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java new file mode 100644 index 00000000000..5a700c422d6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/SubClustersInfoDAO.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.records.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * SubClustersInfoDAO is a class that represent a set of SubClusterInfo. + */ +@XmlRootElement(name = "subClustersInfoMap") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClustersInfoDAO { + + public List subClusters = new ArrayList(); + + public SubClustersInfoDAO() { + } // JAXB needs this + + public SubClustersInfoDAO(Collection subClustersInfo) { + for(SubClusterInfo entry : subClustersInfo) { + subClusters.add(new SubClusterInfoDAO(entry)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java new file mode 100644 index 00000000000..b886837d401 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/dao/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + **/ + +/** Federation StateStore DAO package.*/ +package org.apache.hadoop.yarn.server.federation.store.records.dao; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index df5f50c67a4..6ae9ef0bd7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -67,9 +67,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.slf4j.Logger; @@ -226,6 +228,17 @@ public static FederationStateStoreFacade getInstance() { return FACADE; } + /** + * Register a subcluster identified by {@code SubClusterId} to + * indicate participation in federation + * + * @param info the SubClusterInfo for the SubCluster that is participating + * in federation + */ + public void registerSubCluster(SubClusterInfo info) throws YarnException { + stateStore.registerSubCluster(SubClusterRegisterRequest.newInstance(info)); + } + /** * Deregister a subcluster identified by {@code SubClusterId} to * change state in federation. This can be done to mark the sub cluster lost, @@ -483,6 +496,20 @@ public Configuration getConf() { return this.conf; } + /** + * Periodic heartbeat from a ResourceManager to indicate + * liveliness. + * + * @param subClusterId the id of the sub cluster heart beating + * @param state the state of the sub cluster + * @param capability the capability of the sub cluster + */ + public void subClusterHeartBeat(SubClusterId subClusterId, + SubClusterState state, String capability) throws YarnException { + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId, state, capability)); + } + /** * Helper method to create instances of Object using the class name defined in * the configuration object. The instances creates {@link RetryProxy} using diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java new file mode 100644 index 00000000000..a2e109467bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.router.ssproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts; +import org.apache.hadoop.yarn.server.router.Router; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * RouterStateStoreProxyWebServices is a service that runs on each router that + * can be used to intercept and inspect {@link StateStoreWebServiceProtocol} + * messages from client to the cluster FederationStateStore. It listens + * {@link StateStoreWebServiceProtocol} REST messages from the client and + * creates a request intercepting pipeline instance for each client. The + * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can + * inspect and modify the request/response as needed. The main difference with + * AMRMProxyService is the protocol they implement. + **/ +@Singleton +@Path(HttpProxyFederationStateStoreConsts.ROOT) +public class RouterStateStoreProxyWebServices + implements StateStoreWebServiceProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class); + private final Router router; + private final Configuration conf; + private @Context HttpServletResponse response; + + private Map userPipelineMap; + + @Inject + public RouterStateStoreProxyWebServices(final Router router, + Configuration conf) { + this.router = router; + this.conf = conf; + int maxCacheSize = + conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap( + maxCacheSize, true)); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param config + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration config) { + String configuredInterceptorClassNames = config + .get(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + private void init() { + // clear content type + response.setContentType(null); + } + + @POST + @Path(HttpProxyFederationStateStoreConsts.PATH_REGISTER) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response registerSubCluster(SubClusterInfoDAO scInfoDAO) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().registerSubCluster(scInfoDAO); + } + + @POST + @Path(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().deregisterSubCluster(deregisterDAO); + } + + @POST + @Path(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().subClusterHeartBeat(heartbeatDAO); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS_SCID) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getSubCluster( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_SCID) String subClusterId) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().getSubCluster(subClusterId); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getSubClusters( + @QueryParam(HttpProxyFederationStateStoreConsts.QUERY_SC_FILTER) + @DefaultValue("true") boolean filterInactiveSubClusters) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor() + .getSubClusters(filterInactiveSubClusters); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getPoliciesConfigurations() throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().getPoliciesConfigurations(); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY_QUEUE) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getPolicyConfiguration( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE) String queue) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().getPolicyConfiguration(queue); + } + + @POST + @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response setPolicyConfiguration( + SubClusterPolicyConfigurationDAO policyConf) throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().setPolicyConfiguration(policyConf); + } + + @POST + @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response addApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor() + .addApplicationHomeSubCluster(appHomeDAO); + } + + @PUT + @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response updateApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor() + .updateApplicationHomeSubCluster(appHomeDAO); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getApplicationHomeSubCluster( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) String appId) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().getApplicationHomeSubCluster(appId); + } + + @GET + @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getApplicationsHomeSubCluster() + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().getApplicationsHomeSubCluster(); + } + + @DELETE + @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID) + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response deleteApplicationHomeSubCluster( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) String appId) + throws YarnException { + init(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); + return pipeline.getRootInterceptor().deleteApplicationHomeSubCluster(appId); + } + + @VisibleForTesting + protected RequestInterceptorChainWrapper getInterceptorChain( + HttpServletRequest hsr) { + String user = ""; + if (hsr != null) { + user = hsr.getRemoteUser(); + } + try { + if (user == null || user.equals("")) { + // Yarn Router user + user = UserGroupInformation.getCurrentUser().getUserName(); + } + } catch (IOException e) { + LOG.error("IOException " + e.getMessage()); + } + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + RequestInterceptorChainWrapper chain = userPipelineMap.get(user); + if (chain != null && chain.getRootInterceptor() != null) { + return chain; + } + return initializePipeline(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + @VisibleForTesting + protected Map getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + @VisibleForTesting + protected StateStoreProxyInterceptor createRequestInterceptorChain() { + + List interceptorClassNames = getInterceptorClassNames(conf); + + StateStoreProxyInterceptor pipeline = null; + StateStoreProxyInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (StateStoreProxyInterceptor.class + .isAssignableFrom(interceptorClass)) { + StateStoreProxyInterceptor interceptorInstance = + (StateStoreProxyInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + StateStoreProxyInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate RESTRequestInterceptor: " + + interceptorClassName, e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified user. + * + * @param user + */ + private RequestInterceptorChainWrapper initializePipeline(String user) { + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return userPipelineMap.get(user); + } + + RequestInterceptorChainWrapper chainWrapper = + new RequestInterceptorChainWrapper(); + try { + // We should init the pipeline instance after it is created and then + // add to the map, to ensure thread safe. + LOG.info("Initializing request processing pipeline for user: {}", user); + StateStoreProxyInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + LOG.error("Init StateStoreProxyInterceptor error for user: " + user, e); + throw e; + } + this.userPipelineMap.put(user, chainWrapper); + return chainWrapper; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + */ + @Private + public static class RequestInterceptorChainWrapper { + private StateStoreProxyInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param interceptor the first interceptor in the pipeline + */ + public synchronized void init(StateStoreProxyInterceptor interceptor) { + this.rootInterceptor = interceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized StateStoreProxyInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed. + */ + @Override + protected void finalize() { + rootInterceptor.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java new file mode 100644 index 00000000000..c5be3f29e99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.ssproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import java.util.List; +import java.util.Map; + +/** + * Extends the {@code StateStoreProxyInterceptor} class and provides an + * implementation that simply forwards the client requests to the + * FederationStateStore. + */ +public class StateStoreProxyDefaultInterceptor implements + StateStoreProxyInterceptor { + + private static final Logger LOG = LoggerFactory + .getLogger(StateStoreProxyDefaultInterceptor.class.getName()); + + private FederationStateStoreFacade stateStoreFacade; + private @Context Configuration conf; + + @Override + public void init(String user) { + this.stateStoreFacade = FederationStateStoreFacade.getInstance(); + } + + @Override + public void shutdown() { + + } + + @Override + public void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor) { + throw new YarnRuntimeException(this.getClass().getName() + + " should be the last interceptor"); + } + + @Override + public StateStoreProxyInterceptor getNextInterceptor() { + return null; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Response registerSubCluster(SubClusterInfoDAO scInfoDAO) + throws YarnException { + LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId); + try { + stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo()); + } catch (YarnException e) { + LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e); + throw e; + } + LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId); + return Response.status(Response.Status.OK).build(); + } + + @Override + public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO) + throws YarnException { + SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId(); + SubClusterStateDAO scStateDAO = deregisterDAO.getScState(); + LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId); + try { + stateStoreFacade + .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId), + scStateDAO.getSubClusterState()); + } catch (YarnException e) { + LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e); + throw e; + } + LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId); + return Response.status(Response.Status.OK).build(); + } + + @Override + public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO) + throws YarnException { + SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId(); + SubClusterStateDAO scStateDAO = heartbeatDAO.getScState(); + LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId); + try { + stateStoreFacade + .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.subClusterId), + scStateDAO.getSubClusterState(), heartbeatDAO.getCapability()); + } catch (YarnException e) { + LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.subClusterId, + e); + throw e; + } + LOG.info("Heartbeat for SubCluster {}", scIdDAO.subClusterId); + return Response.status(Response.Status.OK).build(); + } + + @Override + public Response getSubCluster( + String subClusterId) throws YarnException { + + if (subClusterId == null || subClusterId.isEmpty()) { + throw new NotFoundException("subClusterId is empty or null"); + } + + LOG.debug("Fetching subcluster info for subcluster: " + subClusterId); + + SubClusterInfo resp = + stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId)); + + LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId + + ". Subcluster details:" + resp); + + if(resp == null){ + LOG.warn("Subcluster {} does not exist", subClusterId); + return Response.status(Response.Status.NOT_FOUND).build(); + } + + return Response.status(Response.Status.OK).entity( + new SubClusterInfoDAO(resp)).build(); + } + + @Override + public Response getSubClusters( + boolean filterInactiveSubClusters) throws YarnException { + LOG.debug("Fetching info for all subclusters. filterInactiveSubClusters=" + + filterInactiveSubClusters); + + Map resp = + stateStoreFacade.getSubClusters(filterInactiveSubClusters); + + LOG.info( + "Retrieved subcluster info for all subclusters. Subcluster count is=" + + resp.size()); + + return Response.status(Response.Status.OK).entity( + new SubClustersInfoDAO(resp.values())).build(); + } + + @Override + public Response getPoliciesConfigurations() + throws YarnException { + LOG.debug("Fetching policy info for all queues"); + + Map resp = + stateStoreFacade.getPoliciesConfigurations(); + + LOG.debug( + "Retrieved policy info for all queues. Policy count is=" + resp.size()); + + return Response.status(Response.Status.OK).entity( + new SubClusterPoliciesConfigurationsDAO(resp.values())).build(); + } + + @Override + public Response getPolicyConfiguration( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE) + String queue) throws YarnException { + + if (queue == null || queue.isEmpty()) { + throw new NotFoundException("queue name is empty or null"); + } + + LOG.debug("Fetching policy info for queue: " + queue); + + SubClusterPolicyConfiguration resp = + stateStoreFacade.getPolicyConfiguration(queue); + + if(resp == null){ + LOG.warn("Policy for queue {} does not exist", queue); + return Response.status(Response.Status.NOT_FOUND).build(); + } + + LOG.debug("Retrieved policy info for queue: " + queue + ". Policy details:" + + resp); + + return Response.status(Response.Status.OK).entity( + new SubClusterPolicyConfigurationDAO(resp)).build(); + } + + @Override + public Response setPolicyConfiguration( + SubClusterPolicyConfigurationDAO policyConf) throws YarnException { + + + String queue = policyConf.queueName; + + if (queue == null || queue.isEmpty()) { + throw new NotFoundException("queue name is empty or null"); + } + LOG.info("Setting policy info for queue: " + queue); + + try { + stateStoreFacade.setPolicyConfiguration( + policyConf.toSubClusterPolicyConfiguration()); + } catch (YarnException e) { + LOG.error("Could not set policy ", e); + throw e; + } + + LOG.info("Set policy info for queue: " + queue + ". Policy details:"); + return Response.status(Response.Status.OK).build(); + } + + @Override + public Response addApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + LOG.info("Adding home SubCluster for application {} as {}", + appHomeDAO.getAppId(), appHomeDAO.getSubClusterId().subClusterId); + SubClusterId ret; + try { + ret = stateStoreFacade.addApplicationHomeSubCluster( + appHomeDAO.toApplicationHomeSubCluster()); + } catch (YarnException e) { + LOG.error("Could not add home SubCluster ", e); + throw e; + } + LOG.info("Added home SubCluster for application {} as {}", + appHomeDAO.getAppId(), appHomeDAO.getSubClusterId().subClusterId); + return Response.status(Response.Status.OK).entity( + new SubClusterIdDAO(ret)).build(); + } + + @Override + public Response updateApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + LOG.info("Updating home SubCluster for application {} to {}", + appHomeDAO.getAppId(), appHomeDAO.getSubClusterId()); + SubClusterId ret; + try { + stateStoreFacade.updateApplicationHomeSubCluster( + appHomeDAO.toApplicationHomeSubCluster()); + } catch (YarnException e) { + LOG.error("Could not update home SubCluster ", e); + throw e; + } + LOG.info("Updating home SubCluster for application {} as {}", + appHomeDAO.getAppId(), appHomeDAO.getSubClusterId()); + return Response.status(Response.Status.OK).build(); + } + + @Override + public Response getApplicationHomeSubCluster( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) + String appId) throws YarnException { + LOG.debug("Getting home SubCluster for application {}", appId); + SubClusterId resp; + try { + resp = stateStoreFacade.getApplicationHomeSubCluster(Apps.toAppID(appId)); + } catch (YarnException e) { + LOG.error("Could not get home SubCluster ", e); + throw e; + } + + if(resp == null){ + LOG.warn("Home subcluster for application {} does not exist", appId); + return Response.status(Response.Status.NOT_FOUND).build(); + } + + LOG.info("Retrieved home SubCluster for application {}", appId); + return Response.status(Response.Status.OK).entity( + new SubClusterIdDAO(resp)).build(); + } + + @Override + public Response getApplicationsHomeSubCluster() throws YarnException { + LOG.debug("Getting home SubCluster for applications"); + List resp; + try { + resp = stateStoreFacade.getApplicationsHomeSubCluster(); + } catch (YarnException e) { + LOG.error("Could not get home SubClusters ", e); + throw e; + } + + LOG.debug("Retrieved home SubCluster for applications {}", resp); + return Response.status(Response.Status.OK).entity( + new ApplicationsHomeSubClusterDAO(resp)).build(); + } + + @Override + public Response deleteApplicationHomeSubCluster( + @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) + String appId) throws YarnException { + LOG.info("Deleting home SubCluster for application {}", appId); + try { + stateStoreFacade.deleteApplicationHomeSubCluster(Apps.toAppID(appId)); + } catch (YarnException e) { + LOG.error("Could not delete home SubCluster ", e); + throw e; + } + LOG.info("Deleted home SubCluster for application {}", appId); + return Response.status(Response.Status.OK).build(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java new file mode 100644 index 00000000000..21b779c1f2f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyInterceptor.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.ssproxy; + +import org.apache.hadoop.conf.Configurable; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager server. + * + * This class includes all methods that the StateStore implements but in + * REST format. + */ +public interface StateStoreProxyInterceptor extends Configurable, + StateStoreWebServiceProtocol { + + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user the name of the client + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor the RESTRequestInterceptor to set in the pipeline + */ + void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + StateStoreProxyInterceptor getNextInterceptor(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java new file mode 100644 index 00000000000..78fc9abee5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyRejectInterceptor.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.ssproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; + +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; + +/** + * Extends the {@code StateStoreProxyInterceptor} class and provides an + * implementation that simply rejects all the client requests. + */ +public class StateStoreProxyRejectInterceptor implements + StateStoreProxyInterceptor { + private FederationStateStoreFacade stateStoreFacade; + private @Context + Configuration conf; + + @Override + public void init(String user) { + this.stateStoreFacade = FederationStateStoreFacade.getInstance(); + } + + @Override + public void shutdown() { + + } + + @Override + public void setNextInterceptor(StateStoreProxyInterceptor nextInterceptor) { + throw new YarnRuntimeException( + this.getClass().getName() + " should be the last interceptor"); + } + + @Override + public StateStoreProxyInterceptor getNextInterceptor() { + return null; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Response registerSubCluster(SubClusterInfoDAO scInfoDAO) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getSubCluster(String subClusterId) throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getSubClusters(boolean filterInactiveSubclusters) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getPoliciesConfigurations() throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getPolicyConfiguration(String queue) throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response setPolicyConfiguration( + SubClusterPolicyConfigurationDAO policyConf) throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response addApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response updateApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getApplicationHomeSubCluster(String appId) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response getApplicationsHomeSubCluster() throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } + + @Override + public Response deleteApplicationHomeSubCluster(String appId) + throws YarnException { + return Response.status(501).entity("Request reached " + + StateStoreProxyRejectInterceptor.class.getName()).build(); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java new file mode 100644 index 00000000000..053859ae8a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreWebServiceProtocol.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.ssproxy; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO; +import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO; + +import javax.ws.rs.core.Response; + +/** + * The protocol between clients and the FederationStateStore + * over REST APIs + */ +public interface StateStoreWebServiceProtocol { + + /** + * Register a subcluster by publishing capabilities as represented by + * {@code SubClusterInfo} to indicate participation in federation. This is + * typically done during initialization or restart/failover of the + * subcluster's ResourceManager. Upon successful registration, an + * identifier for the subcluster which is unique across the federated + * cluster is returned. The identifier is static, i.e. preserved across + * restarts and failover. + * + * @param scInfoDAO the capabilities of the subcluster that + * wants to participate in federation. The subcluster id is also + * specified in case registration is triggered by restart/failover + * @return response empty on successfully if registration was successful + * @throws YarnException if the request is invalid/fails + */ + Response registerSubCluster(SubClusterInfoDAO scInfoDAO) throws YarnException; + + /** + * Deregister a subcluster identified by {@code SubClusterId} to + * change state in federation. This can be done to mark the sub cluster lost, + * deregistered, or decommissioned. + * + * @param deregisterDAO - the request to deregister the + * sub-cluster from federation. + * @return response empty on successfully deregistering the subcluster state + * @throws YarnException if the request is invalid/fails + */ + Response deregisterSubCluster(SubClusterDeregisterDAO deregisterDAO) + throws YarnException; + + /** + * Periodic heartbeat from a ResourceManager participating in + * federation to indicate liveliness. The heartbeat publishes the current + * capabilities as represented by {@code SubClusterInfo} of the subcluster. + * Currently response is empty if the operation was successful, if not an + * exception reporting reason for a failure. + * + * @param heartbeatDAO the capabilities of the subcluster that + * wants to keep alive its participation in federation + * @return response currently empty on if heartbeat was successfully processed + * @throws YarnException if the request is invalid/fails + */ + Response subClusterHeartBeat(SubClusterHeartbeatDAO heartbeatDAO) + throws YarnException; + + /** + * Get the membership information of subcluster as identified by + * {@code SubClusterId}. The membership information includes the cluster + * endpoint and current capabilities as represented by {@code SubClusterInfo}. + * + * @param subClusterId the subcluster whose information is required + * @return the {@code SubClusterInfo}, or {@code null} if there is no mapping + * for the subcluster + * @throws YarnException if the request is invalid/fails + */ + Response getSubCluster(String subClusterId) throws YarnException; + + /** + * Get the membership information of all the subclusters that are + * currently participating in federation. The membership information includes + * the cluster endpoint and current capabilities as represented by + * {@code SubClusterInfo}. + * + * @param filterInactiveSubclusters whether to filter inactive sub clusters + * @return a map of {@code SubClusterInfo} keyed by the {@code SubClusterId} + * @throws YarnException if the request is invalid/fails + */ + Response getSubClusters(boolean filterInactiveSubclusters) + throws YarnException; + + /** + * Get a map of all queue-to-policy configurations. + * + * @return the policies for all currently active queues in the system + * @throws YarnException if the request is invalid/fails + */ + Response getPoliciesConfigurations() throws YarnException; + + /** + * Get the policy configuration for a given queue. + * + * @param queue the queue whose {@code SubClusterPolicyConfiguration} is + * required + * @return the {@code SubClusterPolicyConfiguration} for the specified queue, + * or {@code null} if there is no mapping for the queue + * @throws YarnException if the request is invalid/fails + */ + Response getPolicyConfiguration(String queue) throws YarnException; + /** + * Set the policy configuration for a given queue. + * + * @param policyConf the {@code SubClusterPolicyConfiguration} with the + * corresponding queue + * @return response empty on successfully updating the + * {@code SubClusterPolicyConfiguration} for the specified queue + * @throws YarnException if the request is invalid/fails + */ + + Response setPolicyConfiguration(SubClusterPolicyConfigurationDAO policyConf) + throws YarnException; + + /** + * Register the home {@code SubClusterId} of the newly submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. If a + * mapping for the application already existed, the {@code SubClusterId} in + * this response will return the existing mapping which might be different + * from that in the {@code AddApplicationHomeSubClusterRequest}. + * + * @param appHomeDAO the request to register a new application with its home + * sub-cluster + * @return upon successful registration of the application in the StateStore, + * {@code AddApplicationHomeSubClusterRequest} containing the home + * sub-cluster of the application. Otherwise, an exception reporting + * reason for a failure + * @throws YarnException if the request is invalid/fails + */ + Response addApplicationHomeSubCluster(ApplicationHomeSubClusterDAO appHomeDAO) + throws YarnException; + + /** + * Update the home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. + * + * @param appHomeDAO the request to update the home sub-cluster of an + * application. + * @return empty on successful update of the application in the StateStore, if + * not an exception reporting reason for a failure + * @throws YarnException if the request is invalid/fails + */ + Response updateApplicationHomeSubCluster( + ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException; + + /** + * Get information about the application identified by the input + * {@code ApplicationId}. + * + * @param appId the application to query + * @return {@code ApplicationHomeSubCluster} containing the application's home + * subcluster + * @throws YarnException if the request is invalid/fails + */ + Response getApplicationHomeSubCluster(String appId) throws YarnException; + + /** + * Get the {@code ApplicationHomeSubCluster} list representing the mapping of + * all submitted applications to it's home sub-cluster. + * + * @return the mapping of all submitted application to it's home sub-cluster + * @throws YarnException if the request is invalid/fails + */ + Response getApplicationsHomeSubCluster() throws YarnException; + + /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. + * + * @param appId the ID of the application to delete + * @return empty on successful update of the application in the StateStore, if + * not an exception reporting reason for a failure + * @throws YarnException if the request is invalid/fails + */ + Response deleteApplicationHomeSubCluster(String appId) throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java new file mode 100644 index 00000000000..ff50e1c18a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router Federation StateStore Proxy package. **/ +package org.apache.hadoop.yarn.server.router.ssproxy; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java index ba07a1afba4..e8af9e4bb85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebApp.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver; import org.apache.hadoop.yarn.server.router.Router; +import org.apache.hadoop.yarn.server.router.ssproxy.RouterStateStoreProxyWebServices; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.YarnWebParams; @@ -38,6 +39,7 @@ public RouterWebApp(Router router) { public void setup() { bind(JAXBContextResolver.class); bind(RouterWebServices.class); + bind(RouterStateStoreProxyWebServices.class); bind(GenericExceptionHandler.class); bind(RouterWebApp.class).toInstance(this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java new file mode 100644 index 00000000000..a3c557126f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/ssproxy/TestRouterStateStoreProxy.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.ssproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.FederationStateStoreBaseTest; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.Router; +import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStore; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Validate the correctness of the StateStoreProxy in the router. + * This test case works by setting the facade to use a MemoryStateStore, + * and then testing the HttpProxyFederationStateStore, which communicates + * via REST with the StateStoreProxyWebServices + */ +public class TestRouterStateStoreProxy extends + FederationStateStoreBaseTest{ + + private static Router router; + + private static FederationStateStore proxyStore; + + public TestRouterStateStoreProxy(){ + } + + @BeforeClass + public static void setUpProxy(){ + // Set up the router proxy to use a memory state store + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + MemoryFederationStateStore.class.getName()); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + conf.set(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE, + StateStoreProxyDefaultInterceptor.class.getName()); + proxyStore = new MemoryFederationStateStore(); + FederationStateStoreFacade.getInstance().reinitialize(proxyStore, conf); + router = new Router(); + router.init(conf); + router.start(); + } + + @AfterClass + public static void tearDownProxy(){ + router.stop(); + } + + @Before + @Override + public void before() throws IOException, YarnException { + super.before(); + proxyStore.init(getConf()); + } + + @After + @Override + public void after() throws Exception{ + super.after(); + proxyStore.close(); + } + + @Override + protected FederationStateStore createStateStore(){ + Configuration conf = new Configuration(); + + // Set up our local state store + super.setConf(conf); + return new HttpProxyFederationStateStore(); + } + +} +