appsInfo);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java
new file mode 100644
index 0000000..565ae87
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationInfoPBImpl.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol buffer based implementation of {@link ApplicationInfo}.
+ */
+@Private
+@Unstable
+public class ApplicationInfoPBImpl extends ApplicationInfo {
+
+ private ApplicationInfoProto proto =
+ ApplicationInfoProto.getDefaultInstance();
+ private ApplicationInfoProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private ApplicationId applicationId = null;
+ private SubClusterId homeSubCluster = null;
+
+ public ApplicationInfoPBImpl() {
+ builder = ApplicationInfoProto.newBuilder();
+ }
+
+ public ApplicationInfoPBImpl(ApplicationInfoProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ApplicationInfoProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ApplicationInfoProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.applicationId != null) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+ if (this.homeSubCluster != null) {
+ builder.setHomeSubCluster(convertToProtoFormat(this.homeSubCluster));
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return applicationId;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId applicationId) {
+ maybeInitBuilder();
+ if (applicationId == null) {
+ builder.clearApplicationId();
+ return;
+ }
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public SubClusterId getHomeSubCluster() {
+ ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.homeSubCluster != null) {
+ return this.homeSubCluster;
+ }
+ if (!p.hasHomeSubCluster()) {
+ return null;
+ }
+ this.homeSubCluster = convertFromProtoFormat(p.getHomeSubCluster());
+ return this.homeSubCluster;
+ }
+
+ @Override
+ public void setHomeSubCluster(SubClusterId homeSubCluster) {
+ maybeInitBuilder();
+ if (homeSubCluster == null) {
+ builder.clearHomeSubCluster();
+ }
+ this.homeSubCluster = homeSubCluster;
+ }
+
+ @Override
+ public long getCreateTime() {
+ ApplicationInfoProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getCreateTime();
+ }
+
+ @Override
+ public void setCreateTime(long createTime) {
+ maybeInitBuilder();
+ builder.setCreateTime(createTime);
+ }
+
+ private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) {
+ return new SubClusterIdPBImpl(subClusterId);
+ }
+
+ private SubClusterIdProto convertToProtoFormat(SubClusterId subClusterId) {
+ return ((SubClusterIdPBImpl) subClusterId).getProto();
+ }
+
+ private ApplicationId convertFromProtoFormat(ApplicationIdProto appId) {
+ return new ApplicationIdPBImpl(appId);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId appId) {
+ return ((ApplicationIdPBImpl) appId).getProto();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java
new file mode 100644
index 0000000..8615b77
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoRequestPBImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoRequestProto;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol buffer based implementation of
+ * {@link GetApplicationsInfoRequest}.
+ */
+@Private
+@Unstable
+public class GetApplicationsInfoRequestPBImpl
+ extends GetApplicationsInfoRequest {
+
+ private GetApplicationsInfoRequestProto proto =
+ GetApplicationsInfoRequestProto.getDefaultInstance();
+ private GetApplicationsInfoRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public GetApplicationsInfoRequestPBImpl() {
+ builder = GetApplicationsInfoRequestProto.newBuilder();
+ }
+
+ public GetApplicationsInfoRequestPBImpl(
+ GetApplicationsInfoRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetApplicationsInfoRequestProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java
new file mode 100644
index 0000000..bc24d2c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsInfoResponsePBImpl.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoResponseProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsInfoResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol buffer based implementation of
+ * {@link GetApplicationsInfoResponse}.
+ */
+@Private
+@Unstable
+public class GetApplicationsInfoResponsePBImpl
+ extends GetApplicationsInfoResponse {
+
+ private GetApplicationsInfoResponseProto proto =
+ GetApplicationsInfoResponseProto.getDefaultInstance();
+ private GetApplicationsInfoResponseProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private List appsInfo;
+
+ public GetApplicationsInfoResponsePBImpl() {
+ builder = GetApplicationsInfoResponseProto.newBuilder();
+ }
+
+ public GetApplicationsInfoResponsePBImpl(
+ GetApplicationsInfoResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetApplicationsInfoResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetApplicationsInfoResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.appsInfo != null) {
+ addAppsInfoToProto();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public List getAppsInfo() {
+ initSubClustersInfoList();
+ return appsInfo;
+ }
+
+ @Override
+ public void setAppsInfo(List appsInfo) {
+ maybeInitBuilder();
+ if (appsInfo == null) {
+ builder.clearAppsInfoList();
+ return;
+ }
+ this.appsInfo = appsInfo;
+ }
+
+ private void initSubClustersInfoList() {
+ if (this.appsInfo != null) {
+ return;
+ }
+ GetApplicationsInfoResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List appInfosList = p.getAppsInfoListList();
+ appsInfo = new ArrayList();
+
+ for (ApplicationInfoProto r : appInfosList) {
+ appsInfo.add(convertFromProtoFormat(r));
+ }
+ }
+
+ private void addAppsInfoToProto() {
+ maybeInitBuilder();
+ builder.clearAppsInfoList();
+ if (appsInfo == null) {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ private Iterator iter = appsInfo.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationInfoProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+
+ }
+
+ };
+ builder.addAllAppsInfoList(iterable);
+ }
+
+ private ApplicationInfo convertFromProtoFormat(ApplicationInfoProto sc) {
+ return new ApplicationInfoPBImpl(sc);
+ }
+
+ private ApplicationInfoProto convertToProtoFormat(ApplicationInfo sc) {
+ return ((ApplicationInfoPBImpl) sc).getProto();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index ef77114..ff17364 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
@@ -420,6 +421,21 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
}
/**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation is
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param applicationId the application to delete the home sub-cluster of
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+ throws YarnException {
+ stateStore.deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+ return;
+ }
+
+ /**
* Get the singleton instance of SubClusterResolver.
*
* @return SubClusterResolver instance
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
index cedf482..67fb52e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
@@ -97,6 +97,12 @@ message ApplicationHomeSubClusterProto {
optional SubClusterIdProto home_sub_cluster = 2;
}
+message ApplicationInfoProto {
+ optional ApplicationIdProto application_id = 1;
+ optional SubClusterIdProto home_sub_cluster = 2;
+ optional int64 createTime = 3;
+}
+
message AddApplicationHomeSubClusterRequestProto {
optional ApplicationHomeSubClusterProto app_subcluster_map = 1;
}
@@ -128,6 +134,13 @@ message GetApplicationsHomeSubClusterResponseProto {
repeated ApplicationHomeSubClusterProto app_subcluster_map = 1;
}
+message GetApplicationsInfoRequestProto {
+
+}
+
+message GetApplicationsInfoResponseProto {
+ repeated ApplicationInfoProto apps_info_list = 1;
+}
message DeleteApplicationHomeSubClusterRequestProto {
optional ApplicationIdProto application_id = 1;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 15cc0f0..ded1eff 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -516,7 +516,7 @@ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
}
- private void addApplicationHomeSC(ApplicationId appId,
+ protected void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
index 289a3a6..9958492 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -23,14 +23,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * HSQLDB implementation of {@link FederationStateStore}.
+ * HSQLDB implementation of {@link FederationGPGStateStore}.
*/
-public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+public class HSQLDBFederationStateStore extends SQLFederationGPGStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java
new file mode 100644
index 0000000..baef886
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationGPGStateStore.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for SQLFederationGPGStateStore.
+ */
+public class TestSQLFederationGPGStateStore
+ extends FederationStateStoreBaseTest {
+
+ private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
+ private static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
+ private static final String DATABASE_USERNAME = "SA";
+ private static final String DATABASE_PASSWORD = "";
+
+ private FederationGPGStateStore sqlStateStore;
+
+ @Override
+ protected FederationStateStore createStateStore() {
+
+ YarnConfiguration conf = new YarnConfiguration();
+
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
+ HSQLDB_DRIVER);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME,
+ DATABASE_USERNAME);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD,
+ DATABASE_PASSWORD);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
+ DATABASE_URL + System.currentTimeMillis());
+ super.setConf(conf);
+
+ sqlStateStore = new HSQLDBFederationStateStore();
+ return sqlStateStore;
+ }
+
+ @Test
+ public void testGetApplicationsInfo() throws Exception {
+ ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+ ApplicationInfo ai1 = ApplicationInfo.newInstance(appId1, subClusterId1, 0);
+
+ ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+ SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+ ApplicationInfo ai2 = ApplicationInfo.newInstance(appId2, subClusterId2, 0);
+
+ addApplicationHomeSC(appId1, subClusterId1);
+ addApplicationHomeSC(appId2, subClusterId2);
+
+ GetApplicationsInfoRequest getRequest =
+ GetApplicationsInfoRequest.newInstance();
+
+ GetApplicationsInfoResponse result =
+ sqlStateStore.getApplicationsInfo(getRequest);
+
+ Assert.assertEquals(2, result.getAppsInfo().size());
+ Assert.assertTrue(result.getAppsInfo().contains(ai1));
+ Assert.assertTrue(result.getAppsInfo().contains(ai2));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 0000000..b26a152
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ *
+ */
+public final class GPGUtils {
+
+ // hide constructor
+ private GPGUtils() {
+ }
+
+ /**
+ * Performs an invocation of the the remote RMWebService.
+ */
+ public static T invokeRMWebService(Configuration conf, String webAddr,
+ String path, final Class returnType) {
+ Client client = Client.create();
+ T obj = null;
+
+ WebResource webResource = client.resource(webAddr);
+ ClientResponse response = webResource.path("ws/v1/cluster").path(path)
+ .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+ if (response.getStatus() == HttpServletResponse.SC_OK) {
+ obj = response.getEntity(returnType);
+ } else {
+ throw new YarnRuntimeException("Bad response from remote web service: "
+ + response.getStatus());
+ }
+ return obj;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index f6cfba0..2f375ec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
+ private ApplicationCleaner applicationCleaner;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
@@ -78,6 +80,7 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+ this.applicationCleaner = new ApplicationCleaner(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME);
@@ -89,7 +92,7 @@ protected void serviceInit(Configuration conf) throws Exception {
protected void serviceStart() throws Exception {
super.serviceStart();
- // Scheduler SubClusterCleaner service
+ // Schedule the SubClusterCleaner service
long scCleanerIntervalMs = getConfig().getLong(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
@@ -99,6 +102,17 @@ protected void serviceStart() throws Exception {
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
+
+ // Schedule the ApplicationCleaner service
+ long appCleanerIntervalMs =
+ getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS);
+ if (appCleanerIntervalMs > 0) {
+ this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+ 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled application cleaner with interval: {}",
+ DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
new file mode 100644
index 0000000..5249ab8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationGPGStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsInfoResponse;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore. Specifically,
+ * applications that meet both the following are removed:
+ *
+ * 1. It is not in running state
+ *
+ * 2. Its appCreation time stamp is older than some threshold
+ */
+public class ApplicationCleaner implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ApplicationCleaner.class);
+
+ private Configuration conf;
+ private GPGContext gpgContext;
+ private long creationExpirationMillis;
+
+ private int minRouterSuccessCount;
+ private int maxRouterRetry;
+ private long routerQueryIntevalMillis;
+
+ private FederationGPGStateStore stateStore;
+
+ public ApplicationCleaner(Configuration conf, GPGContext gpgContext)
+ throws YarnException {
+
+ this.gpgContext = gpgContext;
+ this.conf = conf;
+
+ this.creationExpirationMillis =
+ conf.getLong(YarnConfiguration.GPG_APPCLEANER_CREATION_EXPIRATION_MS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_CREATION_EXPIRATION_MS);
+
+ this.minRouterSuccessCount =
+ conf.getInt(YarnConfiguration.GPG_APPCLEANER_MIN_ROUTER_SUCCESS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_MIN_ROUTER_SUCCESS);
+
+ this.maxRouterRetry =
+ conf.getInt(YarnConfiguration.GPG_APPCLEANER_MAX_ROUTER_RETRY,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_MAX_ROUTER_RETRY);
+
+ this.routerQueryIntevalMillis =
+ conf.getLong(YarnConfiguration.GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS);
+
+ this.stateStore = (FederationGPGStateStore) FederationStateStoreFacade
+ .createRetryInstance(this.conf,
+ YarnConfiguration.GPG_STATESTORE_CLIENT_CLASS,
+ YarnConfiguration.DEFAULT_GPG_STATESTORE_CLIENT_CLASS,
+ FederationGPGStateStore.class,
+ FederationStateStoreFacade.createRetryPolicy(conf));
+ this.stateStore.init(conf);
+
+ LOG.info(
+ "Initialized AppCleaner with application creation expiration of {}",
+ DurationFormatUtils.formatDurationISO(this.creationExpirationMillis));
+ LOG.info(
+ "Router query with min success {}, max retry {}, retry interval {}",
+ this.minRouterSuccessCount, this.maxRouterRetry,
+ DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+
+ if (this.minRouterSuccessCount > this.maxRouterRetry) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount
+ + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+ }
+ if (this.minRouterSuccessCount <= 0) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount + " should be positive");
+ }
+ }
+
+ @VisibleForTesting
+ public void setGpgStateStore(FederationGPGStateStore gpgStateStore)
+ throws Exception {
+ if (this.stateStore != null) {
+ this.stateStore.close();
+ }
+ this.stateStore = gpgStateStore;
+ }
+
+ /**
+ * Query router for applications in certain states.
+ *
+ * @param state application states to look for
+ * @return the set of applications
+ * @throws YarnRuntimeException when router call fails
+ */
+ public Set getAppsFromRouter(Set state)
+ throws YarnRuntimeException {
+ String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+ LOG.info(String.format("Contacting router at: %s", webAppAddress));
+ AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf,
+ webAppAddress, "apps", AppsInfo.class);
+
+ Set appSet = new HashSet();
+ for (AppInfo appInfo : appsInfo.getApps()) {
+ if (state.contains(appInfo.getState())) {
+ appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+ }
+ }
+ return appSet;
+ }
+
+ /**
+ * Get the list of running applications in the cluster.
+ *
+ * @return the list of running applications
+ * @throws YarnException if get app fails
+ */
+ public Set getRunningApplications() throws YarnException {
+ int successCount = 0, totalAttemptCount = 0;
+ Set resultSet = new HashSet();
+ while (totalAttemptCount < this.maxRouterRetry) {
+ try {
+ Set runningApps =
+ getAppsFromRouter(EnumSet.of(YarnApplicationState.RUNNING));
+ resultSet.addAll(runningApps);
+ LOG.info("Attempt {}: {} running apps from Router, {} in total",
+ totalAttemptCount, runningApps.size(), resultSet.size());
+
+ successCount++;
+ if (successCount >= this.minRouterSuccessCount) {
+ return resultSet;
+ }
+
+ // Wait for the next attempt
+ try {
+ Thread.sleep(this.routerQueryIntevalMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted after attempt " + totalAttemptCount);
+ }
+ } catch (Exception e) {
+ LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e);
+ } finally {
+ totalAttemptCount++;
+ }
+ }
+ throw new YarnException("Only " + successCount
+ + " success Router queries after " + totalAttemptCount + " retries");
+ }
+
+ @Override
+ public void run() {
+
+ GetApplicationsInfoResponse response = null;
+ Set appIdToDelete = new HashSet();
+
+ Date now = new Date();
+ LOG.info("Application cleaner run at local time {}", now);
+
+ try {
+ response = this.stateStore
+ .getApplicationsInfo(GetApplicationsInfoRequest.newInstance());
+ LOG.info("Statestore has {} apps in total",
+ response.getAppsInfo().size());
+
+ for (ApplicationInfo appInfo : response.getAppsInfo()) {
+ if (appInfo.getCreateTime() == 0) {
+ LOG.error("Application {} has a zero create time",
+ appInfo.getApplicationId());
+ continue;
+ }
+ Date startTime = new Date(appInfo.getCreateTime());
+ long diffMs = now.getTime() - startTime.getTime();
+
+ if (diffMs > this.creationExpirationMillis) {
+ appIdToDelete.add(appInfo.getApplicationId());
+ LOG.info("Candidate for deletion {}: start time {}, {} from now",
+ appInfo.getApplicationId(), startTime.toString(),
+ DurationFormatUtils.formatDurationISO(diffMs));
+ }
+ }
+ LOG.info("{} candidates for deletion", appIdToDelete.size());
+
+ Set runningApps = getRunningApplications();
+ LOG.info("{} running applications in the cluster", runningApps.size());
+ for (ApplicationId appId : runningApps) {
+ LOG.debug("Running application {} in the cluster", appId.toString());
+ }
+
+ appIdToDelete = Sets.difference(appIdToDelete, runningApps);
+ LOG.info("Deleting {} applications from statestore",
+ appIdToDelete.size());
+ for (ApplicationId appId : Sets.difference(appIdToDelete, runningApps)) {
+ try {
+ this.gpgContext.getStateStoreFacade()
+ .deleteApplicationHomeSubCluster(appId);
+ } catch (Exception e) {
+ LOG.error(
+ "deleteApplicationHomeSubCluster failed at application " + appId,
+ e);
+ }
+ }
+
+ } catch (Throwable e) {
+ LOG.error("Application cleaner started at time " + now + " fails: ", e);
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
new file mode 100644
index 0000000..dd302c8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java
new file mode 100644
index 0000000..1b827e7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestApplicationCleaner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationGPGStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for Application Cleaner in GPG.
+ */
+public class TestApplicationCleaner {
+ private Configuration conf;
+ private MemoryFederationGPGStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private ApplicationCleaner appCleaner;
+ private GPGContext gpgContext;
+
+ private List appIds;
+ // The list of running applications returned by mocked router
+ private Set runningAppIds;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new YarnConfiguration();
+
+ // application expires in one second
+ conf.setLong(YarnConfiguration.GPG_APPCLEANER_CREATION_EXPIRATION_MS, 1000);
+
+ // No wait for Router query retry
+ conf.setLong(YarnConfiguration.GPG_APPCLEANER_ROUTER_RETRY_INTEVAL_MS, 0);
+
+ stateStore = new MemoryFederationGPGStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setStateStoreFacade(facade);
+
+ appCleaner = new TestableApplicationCleaner(conf, gpgContext);
+ appCleaner.setGpgStateStore(stateStore);
+
+ runningAppIds = new HashSet();
+
+ appIds = new ArrayList();
+ for (int i = 0; i < 3; i++) {
+ ApplicationId appId = ApplicationId.newInstance(0, i);
+ appIds.add(appId);
+
+ SubClusterId subClusterId =
+ SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+ stateStore.addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+ }
+ }
+
+ @After
+ public void breakDown() throws Exception {
+ stateStore.close();
+ }
+
+ /**
+ * Test the base use case.
+ */
+ @Test
+ public void testNormalCase() throws InterruptedException, YarnException {
+ // Set first app to be created two seconds ago, and thus should be removed
+ ApplicationId appId = appIds.get(0);
+ stateStore.setApplicationCreateTime(appId,
+ System.currentTimeMillis() - 2000);
+
+ appCleaner.run();
+
+ Assert.assertEquals(2,
+ stateStore
+ .getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance())
+ .getAppsHomeSubClusters().size());
+ }
+
+ /**
+ * Running applications should not be removed.
+ */
+ @Test
+ public void testRunningApps() throws InterruptedException, YarnException {
+ // Set first app to be created two seconds ago, but still running
+ ApplicationId appId = appIds.get(0);
+ stateStore.setApplicationCreateTime(appId,
+ System.currentTimeMillis() - 2000);
+
+ runningAppIds.add(appId);
+
+ // Set second app to be created three seconds ago, and not running
+ appId = appIds.get(1);
+ stateStore.setApplicationCreateTime(appId,
+ System.currentTimeMillis() - 3000);
+
+ appCleaner.run();
+
+ Assert.assertEquals(2,
+ stateStore
+ .getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance())
+ .getAppsHomeSubClusters().size());
+ }
+
+ /**
+ * Testable version of ApplicationCleaner.
+ */
+ public class TestableApplicationCleaner extends ApplicationCleaner {
+
+ public TestableApplicationCleaner(Configuration conf, GPGContext gpgContext)
+ throws YarnException {
+ super(conf, gpgContext);
+ }
+
+ @Override
+ public Set getAppsFromRouter(Set state)
+ throws YarnRuntimeException {
+ return runningAppIds;
+ }
+ }
+
+}