diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 5693342..c840150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; @@ -61,9 +62,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,6 +221,22 @@ public static FederationStateStoreFacade getInstance() { } /** + * Deregister a subcluster identified by {@code SubClusterId} to + * change state in federation. This can be done to mark the sub cluster lost, + * deregistered, or decommissioned. + * + * @param subClusterId the target subclusterId + * @param subClusterState the state to update it to + * @throws YarnException if the request is invalid/fails + */ + public void deregisterSubCluster(SubClusterId subClusterId, + SubClusterState subClusterState) throws YarnException { + stateStore.deregisterSubCluster( + SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState)); + return; + } + + /** * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. * * @param subClusterId the identifier of the sub-cluster @@ -247,8 +266,7 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) public SubClusterInfo getSubCluster(final SubClusterId subClusterId, final boolean flushCache) throws YarnException { if (flushCache && isCachingEnabled()) { - LOG.info("Flushing subClusters from cache and rehydrating from store," - + " most likely on account of RM failover."); + LOG.info("Flushing subClusters from cache and rehydrating from store."); cache.remove(buildGetSubClustersCacheRequest(false)); } return getSubCluster(subClusterId); @@ -279,6 +297,26 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId, } /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public Map getSubClusters( + final boolean filterInactiveSubClusters, final boolean flushCache) + throws YarnException { + if (flushCache && isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store."); + cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } + return getSubClusters(filterInactiveSubClusters); + } + + /** * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * * @param queue the queue whose policy is required @@ -375,6 +413,21 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) } /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. + * + * @param applicationId the application to delete the home sub-cluster of + * @throws YarnException if the request is invalid/fails + */ + public void deleteApplicationHomeSubCluster(ApplicationId applicationId) + throws YarnException { + stateStore.deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest.newInstance(applicationId)); + return; + } + + /** * Get the singleton instance of SubClusterResolver. * * @return SubClusterResolver instance