diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index eabe413..8e79f89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3160,7 +3160,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; - private static final String FEDERATION_GPG_PREFIX = + public static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg."; // The number of threads to use for the GPG scheduled executor service @@ -3168,6 +3168,12 @@ public static boolean isAclEnabled(Configuration conf) { FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10; + // The StateStore interface class for GPG + public static final String GPG_STATESTORE_CLIENT_CLASS = + FEDERATION_GPG_PREFIX + "state-store.class"; + public static final String DEFAULT_GPG_STATESTORE_CLIENT_CLASS = + "org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationGPGStateStore"; + // The interval at which the subcluster cleaner runs, -1 means disabled public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; @@ -3178,6 +3184,47 @@ public static boolean isAclEnabled(Configuration conf) { FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + // The interval at which the application cleaner runs, -1 means disabled + public static final String GPG_APPCLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms"; + public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = -1; + + /** + * The time before a created application is removed from the state store, + * default two weeks (14 days) + */ + public static final String GPG_APPCLEANER_CREATION_EXPIRATION_MS = + FEDERATION_GPG_PREFIX + "application.cleaner.creation.expiration-ms"; + public static final long DEFAULT_GPG_APPCLEANER_CREATION_EXPIRATION_MS = + 1209600000; + + /** + * The minimum number of successful query for running applications from Router + * required before we delete applications. We need to do this because Router + * might return partial application list because some sub-cluster RM is not + * responsive (e.g. failing over). + */ + public static final String GPG_APPCLEANER_MIN_ROUTER_SUCCESS = + FEDERATION_GPG_PREFIX + "application.cleaner.min-router-success"; + public static final int DEFAULT_GPG_APPCLEANER_MIN_ROUTER_SUCCESS = 3; + + /** + * The max number of retries for Router query before an AppCleaner attempt + * give up. + */ + public static final String GPG_APPCLEANER_MAX_ROUTER_RETRY = + FEDERATION_GPG_PREFIX + "application.cleaner.max-router-retry"; + public static final int DEFAULT_GPG_APPCLEANER_MAX_ROUTER_RETRY = 10; + + /** + * The interval in between Router queries in AppCleaner. Default is 20 + * minutes. + */ + public static final String GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS = + FEDERATION_GPG_PREFIX + "application.cleaner.router-retry-inteval-ms"; + public static final long DEFAULT_GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS = + 1200000; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationGPGStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationGPGStateStore.java new file mode 100644 index 0000000..8703964 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationGPGStateStore.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; + +public interface FederationGPGStateStore extends FederationStateStore { + + /** + * Get the application information (e.g. start time) of all applications. + * + * @param request empty representing all applications + * @return application info of all submitted applications + * @throws YarnException if the request is invalid/fails + */ + GetApplicationsInfoResponse getApplicationsInfo( + GetApplicationsInfoRequest request) throws YarnException; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationGPGStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationGPGStateStore.java new file mode 100644 index 0000000..c87eba2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationGPGStateStore.java @@ -0,0 +1,104 @@ +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +import com.google.common.annotations.VisibleForTesting; + +public class MemoryFederationGPGStateStore extends MemoryFederationStateStore + implements FederationGPGStateStore { + + private Map applications; + private final Clock clock = SystemClock.getInstance(); + + @Override + public void init(Configuration conf) { + super.init(conf); + applications = new ConcurrentHashMap(); + } + + @Override + public GetApplicationsInfoResponse getApplicationsInfo( + GetApplicationsInfoRequest request) throws YarnException { + List result = + new ArrayList(applications.values()); + return GetApplicationsInfoResponse.newInstance(result); + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + if (!applications.containsKey(appId)) { + applications.put(appId, + ApplicationInfo.newInstance(appId, + request.getApplicationHomeSubCluster().getHomeSubCluster(), + clock.getTime())); + } + return super.addApplicationHomeSubCluster(request); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + if (!applications.containsKey(appId)) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + applications.put(appId, + ApplicationInfo.newInstance(appId, + request.getApplicationHomeSubCluster().getHomeSubCluster(), + clock.getTime())); + return super.updateApplicationHomeSubCluster(request); + } + + @VisibleForTesting + public void setSubClusterLastHeartbeat(SubClusterId subClusterId, + long lastHearbeat) throws YarnException { + GetSubClusterInfoResponse response = + getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)); + SubClusterInfo subClusterInfo = response.getSubClusterInfo(); + if (subClusterInfo == null) { + throw new YarnException( + "Subcluster " + subClusterId.toString() + " does not exist"); + } + subClusterInfo.setLastHeartBeat(lastHearbeat); + } + + @VisibleForTesting + public void setApplicationCreateTime(ApplicationId appId, long createTime) + throws YarnException { + ApplicationInfo appInfo = applications.get(appId); + if (appInfo == null) { + throw new YarnException( + "Application " + appId.toString() + " does not exist"); + } + appInfo.setCreateTime(createTime); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index b42fc79..493874c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,8 +78,6 @@ private Map applications; private Map policies; - private final MonotonicClock clock = new MonotonicClock(); - public static final Logger LOG = LoggerFactory.getLogger(MemoryFederationStateStore.class); @@ -267,7 +264,6 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( @Override public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { - FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationGPGStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationGPGStateStore.java new file mode 100644 index 0000000..b9249c7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationGPGStateStore.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * SQL implementation of {@link FederationGPGStateStore}. + */ +public class SQLFederationGPGStateStore extends SQLFederationStateStore + implements FederationGPGStateStore { + + private static final String CALL_SP_GET_APPLICATIONS_INFO = + "{call sp_getApplicationsHomeSubCluster()}"; + + private static RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + @Override + public GetApplicationsInfoResponse getApplicationsInfo( + GetApplicationsInfoRequest request) throws YarnException { + CallableStatement cstmt = null; + Connection conn = null; + List appsInfoList = new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_INFO); + ResultSet rs = cstmt.executeQuery(); + + boolean hasCreateTime = false; + long createTime = 0; + if (rs.getMetaData().getColumnCount() >= 3) { + LOG.info("Third column name in table ApplicationsHomeSubCluster is: " + + rs.getMetaData().getColumnName(3)); + hasCreateTime = true; + } else { + LOG.warn("Application create time does not present in table" + + " ApplicationsHomeSubCluster"); + } + while (rs.next()) { + + String applicationId = rs.getString(1); + String homeSubCluster = rs.getString(2); + if (hasCreateTime) { + createTime = rs.getTimestamp(3, utcCalendar).getTime(); + } + + appsInfoList.add(ApplicationInfo.newInstance( + ConverterUtils.toApplicationId(recordFactory, applicationId), + SubClusterId.newInstance(homeSubCluster), createTime)); + } + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the applications ", e); + } finally { + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetApplicationsInfoResponse.newInstance(appsInfoList); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 533f9c8..be1857b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -129,7 +129,7 @@ private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = "{call sp_getPoliciesConfigurations()}"; - private Calendar utcCalendar = + protected Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); // SQL database configurations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationInfo.java new file mode 100644 index 0000000..a38dd2f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationInfo.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * ApplicationInfo is a report of the runtime information of the application + * that is running in the federated cluster. + * + *

+ * It includes information such as: + *

    + *
  • {@link ApplicationId}
  • + *
  • {@link SubClusterId}
  • + *
  • CreateTime
  • + *
+ * + */ +@Private +@Unstable +public abstract class ApplicationInfo { + + @Private + @Unstable + public static ApplicationInfo newInstance(ApplicationId appId, + SubClusterId homeSubCluster, long createTime) { + ApplicationInfo appInfo = Records.newRecord(ApplicationInfo.class); + appInfo.setApplicationId(appId); + appInfo.setHomeSubCluster(homeSubCluster); + appInfo.setCreateTime(createTime); + return appInfo; + } + + /** + * Get the {@link ApplicationId} representing the unique identifier of the + * application. + * + * @return the application identifier + */ + @Public + @Unstable + public abstract ApplicationId getApplicationId(); + + /** + * Set the {@link ApplicationId} representing the unique identifier of the + * application. + * + * @param applicationId the application identifier + */ + @Private + @Unstable + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get the {@link SubClusterId} representing the unique identifier of the home + * subcluster in which the ApplicationMaster of the application is running. + * + * @return the home subcluster identifier + */ + @Public + @Unstable + public abstract SubClusterId getHomeSubCluster(); + + /** + * Set the {@link SubClusterId} representing the unique identifier of the home + * subcluster in which the ApplicationMaster of the application is running. + * + * @param homeSubCluster the home subcluster identifier + */ + @Private + @Unstable + public abstract void setHomeSubCluster(SubClusterId homeSubCluster); + + /** + * Get the creation time of the application. + * + * @return the creation time + */ + @Public + @Unstable + public abstract long getCreateTime(); + + /** + * Set the creation time of the application. + * + * @param createTime the creation time + */ + @Private + @Unstable + public abstract void setCreateTime(long createTime); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ApplicationInfo other = (ApplicationInfo) obj; + if (!this.getApplicationId().equals(other.getApplicationId())) { + return false; + } + if (!this.getHomeSubCluster().equals(other.getHomeSubCluster())) { + return false; + } + return this.getCreateTime() == other.getCreateTime(); + } + + @Override + public int hashCode() { + return (getApplicationId().hashCode() * 31 + getHomeSubCluster().hashCode()) + * 37 + (int) getCreateTime(); + } + + @Override + public String toString() { + return "ApplicationInfo [getApplicationId()=" + getApplicationId() + + ", getHomeSubCluster()=" + getHomeSubCluster() + "], getCreateTime()=" + + getCreateTime(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoRequest.java new file mode 100644 index 0000000..541f0e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * Request class to obtain the application info of all active applications. + */ +@Private +@Unstable +public abstract class GetApplicationsInfoRequest { + + @Private + @Unstable + public static GetApplicationsInfoRequest newInstance() { + GetApplicationsInfoRequest request = + Records.newRecord(GetApplicationsInfoRequest.class); + return request; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoResponse.java new file mode 100644 index 0000000..cabedc5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsInfoResponse.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The response sent by Federation state store to a query for the + * info of all submitted applications. + * + *

+ * The response includes the mapping details, i.e.: + *

    + *
  • {@code ApplicationId}
  • + *
  • {@code SubClusterId}
  • + *
  • CreateTime
  • + *
+ */ +@Private +@Unstable +public abstract class GetApplicationsInfoResponse { + + @Private + @Unstable + public static GetApplicationsInfoResponse newInstance( + List appsInfo) { + GetApplicationsInfoResponse response = + Records.newRecord(GetApplicationsInfoResponse.class); + response.setAppsInfo(appsInfo); + return response; + } + + /** + * Get the {@link ApplicationInfo} list representing the information of all + * submitted applications. + * + * @return the ApplicationInfo list. + */ + @Public + @Unstable + public abstract List getAppsInfo(); + + /** + * Set the {@link ApplicationInfo} list representing the information of all + * submitted applications. + * + * @param appsInfo the info list of all submitted application. + */ + @Private + @Unstable + public abstract void setAppsInfo(List appsInfo); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java new file mode 100644 index 0000000..565ae87 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProtoOrBuilder; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +import com.google.protobuf.TextFormat; + +/** + * Protocol buffer based implementation of {@link ApplicationInfo}. + */ +@Private +@Unstable +public class ApplicationInfoPBImpl extends ApplicationInfo { + + private ApplicationInfoProto proto = + ApplicationInfoProto.getDefaultInstance(); + private ApplicationInfoProto.Builder builder = null; + private boolean viaProto = false; + + private ApplicationId applicationId = null; + private SubClusterId homeSubCluster = null; + + public ApplicationInfoPBImpl() { + builder = ApplicationInfoProto.newBuilder(); + } + + public ApplicationInfoPBImpl(ApplicationInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationInfoProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.homeSubCluster != null) { + builder.setHomeSubCluster(convertToProtoFormat(this.homeSubCluster)); + } + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ApplicationId getApplicationId() { + ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + return; + } + this.applicationId = applicationId; + } + + @Override + public SubClusterId getHomeSubCluster() { + ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.homeSubCluster != null) { + return this.homeSubCluster; + } + if (!p.hasHomeSubCluster()) { + return null; + } + this.homeSubCluster = convertFromProtoFormat(p.getHomeSubCluster()); + return this.homeSubCluster; + } + + @Override + public void setHomeSubCluster(SubClusterId homeSubCluster) { + maybeInitBuilder(); + if (homeSubCluster == null) { + builder.clearHomeSubCluster(); + } + this.homeSubCluster = homeSubCluster; + } + + @Override + public long getCreateTime() { + ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getCreateTime(); + } + + @Override + public void setCreateTime(long createTime) { + maybeInitBuilder(); + builder.setCreateTime(createTime); + } + + private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) { + return new SubClusterIdPBImpl(subClusterId); + } + + private SubClusterIdProto convertToProtoFormat(SubClusterId subClusterId) { + return ((SubClusterIdPBImpl) subClusterId).getProto(); + } + + private ApplicationId convertFromProtoFormat(ApplicationIdProto appId) { + return new ApplicationIdPBImpl(appId); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId appId) { + return ((ApplicationIdPBImpl) appId).getProto(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java new file mode 100644 index 0000000..8615b77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoRequestProto; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; + +import com.google.protobuf.TextFormat; + +/** + * Protocol buffer based implementation of + * {@link GetApplicationsInfoRequest}. + */ +@Private +@Unstable +public class GetApplicationsInfoRequestPBImpl + extends GetApplicationsInfoRequest { + + private GetApplicationsInfoRequestProto proto = + GetApplicationsInfoRequestProto.getDefaultInstance(); + private GetApplicationsInfoRequestProto.Builder builder = null; + private boolean viaProto = false; + + public GetApplicationsInfoRequestPBImpl() { + builder = GetApplicationsInfoRequestProto.newBuilder(); + } + + public GetApplicationsInfoRequestPBImpl( + GetApplicationsInfoRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetApplicationsInfoRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java new file mode 100644 index 0000000..bc24d2c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoResponseProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; + +import com.google.protobuf.TextFormat; + +/** + * Protocol buffer based implementation of + * {@link GetApplicationsInfoResponse}. + */ +@Private +@Unstable +public class GetApplicationsInfoResponsePBImpl + extends GetApplicationsInfoResponse { + + private GetApplicationsInfoResponseProto proto = + GetApplicationsInfoResponseProto.getDefaultInstance(); + private GetApplicationsInfoResponseProto.Builder builder = null; + private boolean viaProto = false; + + private List appsInfo; + + public GetApplicationsInfoResponsePBImpl() { + builder = GetApplicationsInfoResponseProto.newBuilder(); + } + + public GetApplicationsInfoResponsePBImpl( + GetApplicationsInfoResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetApplicationsInfoResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetApplicationsInfoResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.appsInfo != null) { + addAppsInfoToProto(); + } + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public List getAppsInfo() { + initSubClustersInfoList(); + return appsInfo; + } + + @Override + public void setAppsInfo(List appsInfo) { + maybeInitBuilder(); + if (appsInfo == null) { + builder.clearAppsInfoList(); + return; + } + this.appsInfo = appsInfo; + } + + private void initSubClustersInfoList() { + if (this.appsInfo != null) { + return; + } + GetApplicationsInfoResponseProtoOrBuilder p = viaProto ? proto : builder; + List appInfosList = p.getAppsInfoListList(); + appsInfo = new ArrayList(); + + for (ApplicationInfoProto r : appInfosList) { + appsInfo.add(convertFromProtoFormat(r)); + } + } + + private void addAppsInfoToProto() { + maybeInitBuilder(); + builder.clearAppsInfoList(); + if (appsInfo == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + private Iterator iter = appsInfo.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationInfoProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + + } + + }; + builder.addAllAppsInfoList(iterable); + } + + private ApplicationInfo convertFromProtoFormat(ApplicationInfoProto sc) { + return new ApplicationInfoPBImpl(sc); + } + + private ApplicationInfoProto convertToProtoFormat(ApplicationInfo sc) { + return ((ApplicationInfoPBImpl) sc).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index ef77114..ff17364 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; @@ -420,6 +421,21 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) } /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation is + * successful, if not an exception reporting reason for a failure. + * + * @param applicationId the application to delete the home sub-cluster of + * @throws YarnException if the request is invalid/fails + */ + public void deleteApplicationHomeSubCluster(ApplicationId applicationId) + throws YarnException { + stateStore.deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest.newInstance(applicationId)); + return; + } + + /** * Get the singleton instance of SubClusterResolver. * * @return SubClusterResolver instance diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index cedf482..67fb52e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -97,6 +97,12 @@ message ApplicationHomeSubClusterProto { optional SubClusterIdProto home_sub_cluster = 2; } +message ApplicationInfoProto { + optional ApplicationIdProto application_id = 1; + optional SubClusterIdProto home_sub_cluster = 2; + optional int64 createTime = 3; +} + message AddApplicationHomeSubClusterRequestProto { optional ApplicationHomeSubClusterProto app_subcluster_map = 1; } @@ -128,6 +134,13 @@ message GetApplicationsHomeSubClusterResponseProto { repeated ApplicationHomeSubClusterProto app_subcluster_map = 1; } +message GetApplicationsInfoRequestProto { + +} + +message GetApplicationsInfoResponseProto { + repeated ApplicationInfoProto apps_info_list = 1; +} message DeleteApplicationHomeSubClusterRequestProto { optional ApplicationIdProto application_id = 1; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 15cc0f0..ded1eff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -516,7 +516,7 @@ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb); } - private void addApplicationHomeSC(ApplicationId appId, + protected void addApplicationHomeSC(ApplicationId appId, SubClusterId subClusterId) throws YarnException { ApplicationHomeSubCluster ahsc = ApplicationHomeSubCluster.newInstance(appId, subClusterId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index 289a3a6..9958492 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -23,14 +23,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * HSQLDB implementation of {@link FederationStateStore}. + * HSQLDB implementation of {@link FederationGPGStateStore}. */ -public class HSQLDBFederationStateStore extends SQLFederationStateStore { +public class HSQLDBFederationStateStore extends SQLFederationGPGStateStore { private static final Logger LOG = LoggerFactory.getLogger(HSQLDBFederationStateStore.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java new file mode 100644 index 0000000..baef886 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit tests for SQLFederationGPGStateStore. + */ +public class TestSQLFederationGPGStateStore + extends FederationStateStoreBaseTest { + + private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; + private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; + private static final String DATABASE_USERNAME = "SA"; + private static final String DATABASE_PASSWORD = ""; + + private FederationGPGStateStore sqlStateStore; + + @Override + protected FederationStateStore createStateStore() { + + YarnConfiguration conf = new YarnConfiguration(); + + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + HSQLDB_DRIVER); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, + DATABASE_USERNAME); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, + DATABASE_PASSWORD); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, + DATABASE_URL + System.currentTimeMillis()); + super.setConf(conf); + + sqlStateStore = new HSQLDBFederationStateStore(); + return sqlStateStore; + } + + @Test + public void testGetApplicationsInfo() throws Exception { + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + ApplicationInfo ai1 = ApplicationInfo.newInstance(appId1, subClusterId1, 0); + + ApplicationId appId2 = ApplicationId.newInstance(1, 2); + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + ApplicationInfo ai2 = ApplicationInfo.newInstance(appId2, subClusterId2, 0); + + addApplicationHomeSC(appId1, subClusterId1); + addApplicationHomeSC(appId2, subClusterId2); + + GetApplicationsInfoRequest getRequest = + GetApplicationsInfoRequest.newInstance(); + + GetApplicationsInfoResponse result = + sqlStateStore.getApplicationsInfo(getRequest); + + Assert.assertEquals(2, result.getAppsInfo().size()); + Assert.assertTrue(result.getAppsInfo().contains(ai1)); + Assert.assertTrue(result.getAppsInfo().contains(ai2)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java new file mode 100644 index 0000000..8d43022 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +/** + * GPGUtils contains utility functions for the GPG + * + */ +public final class GPGUtils { + + // hide constructor + private GPGUtils() { + } + + /** + * Performs an invocation of the the remote RMWebService. + */ + public static T invokeRMWebService(Configuration conf, String webAddr, + String path, final Class returnType) { + Client client = Client.create(); + T obj = null; + + WebResource webResource = client.resource(webAddr); + ClientResponse response = webResource.path("ws/v1/cluster").path(path) + .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + if (response.getClientResponseStatus() == ClientResponse.Status.OK) { + obj = response.getEntity(returnType); + } else { + throw new YarnRuntimeException("Bad response from remote web service: " + + response.getClientResponseStatus()); + } + return obj; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index f6cfba0..2f375ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; private SubClusterCleaner subClusterCleaner; + private ApplicationCleaner applicationCleaner; public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); @@ -78,6 +80,7 @@ protected void serviceInit(Configuration conf) throws Exception { conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + this.applicationCleaner = new ApplicationCleaner(conf, this.gpgContext); DefaultMetricsSystem.initialize(METRICS_NAME); @@ -89,7 +92,7 @@ protected void serviceInit(Configuration conf) throws Exception { protected void serviceStart() throws Exception { super.serviceStart(); - // Scheduler SubClusterCleaner service + // Schedule the SubClusterCleaner service long scCleanerIntervalMs = getConfig().getLong( YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); @@ -99,6 +102,17 @@ protected void serviceStart() throws Exception { LOG.info("Scheduled sub-cluster cleaner with interval: {}", DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); } + + // Schedule the ApplicationCleaner service + long appCleanerIntervalMs = + getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS); + if (appCleanerIntervalMs > 0) { + this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner, + 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Scheduled application cleaner with interval: {}", + DurationFormatUtils.formatDurationISO(appCleanerIntervalMs)); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java new file mode 100644 index 0000000..721e8a2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.Date; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang.time.DurationFormatUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +/** + * The ApplicationCleaner is a runnable that cleans up old applications from + * table applicationsHomeSubCluster in FederationStateStore. Specifically, + * applications that meet both the following are removed: + * + * 1. It is not in running state + * + * 2. Its appCreation time stamp is older than some threshold + */ +public class ApplicationCleaner implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ApplicationCleaner.class); + + private Configuration conf; + private GPGContext gpgContext; + private long creationExpirationMillis; + + private int minRouterSuccessCount; + private int maxRouterRetry; + private long routerQueryIntevalMillis; + + private FederationGPGStateStore stateStore; + + public ApplicationCleaner(Configuration conf, GPGContext gpgContext) + throws YarnException { + + this.gpgContext = gpgContext; + this.conf = conf; + + this.creationExpirationMillis = + conf.getLong(YarnConfiguration.GPG_APPCLEANER_CREATION_EXPIRATION_MS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_CREATION_EXPIRATION_MS); + + this.minRouterSuccessCount = + conf.getInt(YarnConfiguration.GPG_APPCLEANER_MIN_ROUTER_SUCCESS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_MIN_ROUTER_SUCCESS); + + this.maxRouterRetry = + conf.getInt(YarnConfiguration.GPG_APPCLEANER_MAX_ROUTER_RETRY, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_MAX_ROUTER_RETRY); + + this.routerQueryIntevalMillis = + conf.getLong(YarnConfiguration.GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS); + + this.stateStore = (FederationGPGStateStore) FederationStateStoreFacade + .createRetryInstance(this.conf, + YarnConfiguration.GPG_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_GPG_STATESTORE_CLIENT_CLASS, + FederationGPGStateStore.class, + FederationStateStoreFacade.createRetryPolicy(conf)); + this.stateStore.init(conf); + + LOG.info( + "Initialized AppCleaner with application creation expiration of {}", + DurationFormatUtils.formatDurationISO(this.creationExpirationMillis)); + LOG.info( + "Router query with min success {}, max retry {}, retry interval {}", + this.minRouterSuccessCount, this.maxRouterRetry, + DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis)); + + if (this.minRouterSuccessCount > this.maxRouterRetry) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + + " should not be larger than maxRouterRetry" + this.maxRouterRetry); + } + if (this.minRouterSuccessCount <= 0) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + " should be positive"); + } + } + + @VisibleForTesting + public void setGpgStateStore(FederationGPGStateStore gpgStateStore) + throws Exception { + if (this.stateStore != null) { + this.stateStore.close(); + } + this.stateStore = gpgStateStore; + } + + /** + * Query router for applications in certain states. + * + * @param state application states to look for + * @return the set of applications + * @throws YarnRuntimeException when router call fails + */ + public Set getAppsFromRouter(Set state) + throws YarnRuntimeException { + String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf); + + LOG.info(String.format("Contacting router at: %s", webAppAddress)); + AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf, + webAppAddress, "apps", AppsInfo.class); + + Set appSet = new HashSet(); + for (AppInfo appInfo : appsInfo.getApps()) { + if (state.contains(appInfo.getState())) { + appSet.add(ApplicationId.fromString(appInfo.getAppId())); + } + } + return appSet; + } + + /** + * Get the list of running applications in the cluster. + * + * @return the list of running applications + * @throws YarnException if get app fails + */ + public Set getRunningApplications() throws YarnException { + int successCount = 0, totalAttemptCount = 0; + Set resultSet = new HashSet(); + while (totalAttemptCount < this.maxRouterRetry) { + try { + Set runningApps = + getAppsFromRouter(EnumSet.of(YarnApplicationState.RUNNING)); + resultSet.addAll(runningApps); + LOG.info("Attempt {}: {} running apps from Router, {} in total", + totalAttemptCount, runningApps.size(), resultSet.size()); + + successCount++; + if (successCount >= this.minRouterSuccessCount) { + return resultSet; + } + + // Wait for the next attempt + try { + Thread.sleep(this.routerQueryIntevalMillis); + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted after attempt " + totalAttemptCount); + } + } catch (Exception e) { + LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e); + } finally { + totalAttemptCount++; + } + } + throw new YarnException("Only " + successCount + + " success Router queries after " + totalAttemptCount + " retries"); + } + + @Override + public void run() { + + GetApplicationsInfoResponse response = null; + Set appIdToDelete = new HashSet(); + + Date now = new Date(); + LOG.info("Application cleaner run at local time {}", now); + + try { + response = this.stateStore + .getApplicationsInfo(GetApplicationsInfoRequest.newInstance()); + LOG.info("Statestore has {} apps in total", + response.getAppsInfo().size()); + + for (ApplicationInfo appInfo : response.getAppsInfo()) { + if (appInfo.getCreateTime() == 0) { + LOG.error("Application {} has a zero create time", + appInfo.getApplicationId()); + continue; + } + Date startTime = new Date(appInfo.getCreateTime()); + long diffMs = now.getTime() - startTime.getTime(); + + if (diffMs > this.creationExpirationMillis) { + appIdToDelete.add(appInfo.getApplicationId()); + LOG.info("Candidate for deletion {}: start time {}, {} from now", + appInfo.getApplicationId(), startTime.toString(), + DurationFormatUtils.formatDurationISO(diffMs)); + } + } + LOG.info("{} candidates for deletion", appIdToDelete.size()); + + Set runningApps = getRunningApplications(); + LOG.info("{} running applications in the cluster", runningApps.size()); + for (ApplicationId appId : runningApps) { + LOG.debug("Running application {} in the cluster", appId.toString()); + } + + appIdToDelete = Sets.difference(appIdToDelete, runningApps); + LOG.info("Deleting {} applications from statestore", + appIdToDelete.size()); + for (ApplicationId appId : Sets.difference(appIdToDelete, runningApps)) { + try { + this.gpgContext.getStateStoreFacade() + .deleteApplicationHomeSubCluster(appId); + } catch (Exception e) { + LOG.error( + "deleteApplicationHomeSubCluster failed at application " + appId, + e); + } + } + + } catch (Throwable e) { + LOG.error("Application cleaner started at time " + now + " fails: ", e); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java new file mode 100644 index 0000000..dd302c8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java new file mode 100644 index 0000000..1cb3dcb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationGPGStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for Application Cleaner in GPG. + */ +public class TestApplicationCleaner { + Configuration conf; + MemoryFederationGPGStateStore stateStore; + FederationStateStoreFacade facade; + ApplicationCleaner appCleaner; + GPGContext gpgContext; + + List appIds; + // The list of running applications returned by mocked router + Set runningAppIds; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + + // application expires in one second + conf.setLong(YarnConfiguration.GPG_APPCLEANER_CREATION_EXPIRATION_MS, 1000); + + // No wait for Router query retry + conf.setLong(YarnConfiguration.GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS, 0); + + stateStore = new MemoryFederationGPGStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + appCleaner = new TestableApplicationCleaner(conf, gpgContext); + appCleaner.setGpgStateStore(stateStore); + + runningAppIds = new HashSet(); + + appIds = new ArrayList(); + for (int i = 0; i < 3; i++) { + ApplicationId appId = ApplicationId.newInstance(0, i); + appIds.add(appId); + + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i)); + + stateStore.addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest.newInstance( + ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + } + } + + @After + public void breakDown() throws Exception { + stateStore.close(); + } + + /** + * Test the base use case. + */ + @Test + public void testNormalCase() throws InterruptedException, YarnException { + // Set first app to be created two seconds ago, and thus should be removed + ApplicationId appId = appIds.get(0); + stateStore.setApplicationCreateTime(appId, + System.currentTimeMillis() - 2000); + + appCleaner.run(); + + Assert.assertEquals(2, + stateStore + .getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()) + .getAppsHomeSubClusters().size()); + } + + /** + * Running applications should not be removed. + */ + @Test + public void testRunningApps() throws InterruptedException, YarnException { + // Set first app to be created two seconds ago, but still running + ApplicationId appId = appIds.get(0); + stateStore.setApplicationCreateTime(appId, + System.currentTimeMillis() - 2000); + + runningAppIds.add(appId); + + // Set second app to be created three seconds ago, and not running + appId = appIds.get(1); + stateStore.setApplicationCreateTime(appId, + System.currentTimeMillis() - 3000); + + appCleaner.run(); + + Assert.assertEquals(2, + stateStore + .getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()) + .getAppsHomeSubClusters().size()); + } + + /** + * Testable version of ApplicationCleaner + */ + public class TestableApplicationCleaner extends ApplicationCleaner { + + public TestableApplicationCleaner(Configuration conf, GPGContext gpgContext) + throws YarnException { + super(conf, gpgContext); + } + + @Override + public Set getAppsFromRouter(Set state) + throws YarnRuntimeException { + return runningAppIds; + } + } + +}