diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 7c06256..0895b77 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -68,6 +68,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* In-memory implementation of {@link FederationStateStore}.
*/
@@ -158,6 +160,19 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(
return SubClusterHeartbeatResponse.newInstance();
}
+ @VisibleForTesting
+ public void setSubClusterLastHeartbeat(SubClusterId subClusterId,
+ long lastHeartbeat) throws YarnException {
+ GetSubClusterInfoResponse response =
+ getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId));
+ SubClusterInfo subClusterInfo = response.getSubClusterInfo();
+ if (subClusterInfo == null) {
+ throw new YarnException(
+ "Subcluster " + subClusterId.toString() + " does not exist");
+ }
+ subClusterInfo.setLastHeartBeat(lastHeartbeat);
+ }
+
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 682eb14..ef77114 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,9 +62,11 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -221,6 +223,22 @@ public static FederationStateStoreFacade getInstance() {
}
/**
+ * Deregister a subcluster identified by {@code SubClusterId} to
+ * change state in federation. This can be done to mark the sub cluster lost,
+ * deregistered, or decommissioned.
+ *
+ * @param subClusterId the target subclusterId
+ * @param subClusterState the state to update it to
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deregisterSubCluster(SubClusterId subClusterId,
+ SubClusterState subClusterState) throws YarnException {
+ stateStore.deregisterSubCluster(
+ SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState));
+ return;
+ }
+
+ /**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
@@ -255,8 +273,7 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
- LOG.info("Flushing subClusters from cache and rehydrating from store,"
- + " most likely on account of RM failover.");
+ LOG.info("Flushing subClusters from cache and rehydrating from store.");
cache.remove(buildGetSubClustersCacheRequest(false));
}
return getSubCluster(subClusterId);
@@ -287,6 +304,26 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
}
/**
+ * Updates the cache with the central {@link FederationStateStore} and returns
+ * the {@link SubClusterInfo} of all active sub cluster(s).
+ *
+ * @param filterInactiveSubClusters whether to filter out inactive
+ * sub-clusters
+ * @param flushCache flag to indicate if the cache should be flushed or not
+ * @return the sub cluster information
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public Map getSubClusters(
+ final boolean filterInactiveSubClusters, final boolean flushCache)
+ throws YarnException {
+ if (flushCache && isCachingEnabled()) {
+ LOG.info("Flushing subClusters from cache and rehydrating from store.");
+ cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+ }
+ return getSubClusters(filterInactiveSubClusters);
+ }
+
+ /**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
* @param queue the queue whose policy is required
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 0000000..823e102
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ */
+public final class GPGUtils {
+
+ // hide constructor
+ private GPGUtils() {
+ }
+
+ /**
+ * Converts the given millisecond duration into a formatted string.
+ *
+ * @param millis
+ * @return String representation of the millisecond duration formatted like
+ * XXD:XXH:XXM:XXS:XXXMS
+ */
+ public static String getDurationString(long millis) {
+ if (millis < 0) {
+ throw new IllegalArgumentException("Duration must be non-negative!");
+ }
+
+ long days = TimeUnit.MILLISECONDS.toDays(millis);
+ millis -= TimeUnit.DAYS.toMillis(days);
+ long hours = TimeUnit.MILLISECONDS.toHours(millis);
+ millis -= TimeUnit.HOURS.toMillis(hours);
+ long minutes = TimeUnit.MILLISECONDS.toMinutes(millis);
+ millis -= TimeUnit.MINUTES.toMillis(minutes);
+ long seconds = TimeUnit.MILLISECONDS.toSeconds(millis);
+ millis -= TimeUnit.SECONDS.toMillis(seconds);
+
+ StringBuilder sb = new StringBuilder(64);
+ sb.append(days);
+ sb.append("D:");
+ sb.append(String.format("%02d", hours));
+ sb.append("H:");
+ sb.append(String.format("%02d", minutes));
+ sb.append("M:");
+ sb.append(String.format("%02d", seconds));
+ sb.append("S:");
+ sb.append(String.format("%03d", millis));
+ sb.append("MS");
+
+ return (sb.toString());
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index c1f7460..e8abbf6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -46,6 +46,24 @@
public static final Logger LOG =
LoggerFactory.getLogger(GlobalPolicyGenerator.class);
+ public static final String FEDERATION_PREFIX = "yarn.federation.";
+
+ // -------------- Federation GlobalPolicyGenerator configuration -----------//
+
+ private static final String FEDERATION_GPG_PREFIX =
+ FEDERATION_PREFIX + "globalpolicygenerator.";
+
+ // The interval at which the subcluster cleaner runs, default is a minute
+ public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
+ FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms";
+ public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
+ 60000;
+
+ // The expiration time for a subcluster heartbeat, default is 30 minutes
+ public static final String GPG_SUBCLUSTER_EXPIRATION_MS =
+ FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
+ public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+
// YARN Variables
private static CompositeServiceShutdownHook gpgShutdownHook;
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
@@ -54,6 +72,7 @@
// Federation Variables
private GPGContext gpgContext;
+ private SubClusterCleanerService scCleanerService;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
@@ -85,6 +104,9 @@ protected void serviceInit(Configuration conf) throws Exception {
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ this.scCleanerService = new SubClusterCleanerService(this.gpgContext);
+ addService(this.scCleanerService);
+
DefaultMetricsSystem.initialize(METRICS_NAME);
// super.serviceInit after all services are added
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java
new file mode 100644
index 0000000..7e7e881
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service that runs periodically to change the status of inactive
+ * sub-clusters (ones that have not sent heartbeat for a while) to lost in
+ * Federation State Store.
+ */
+public class SubClusterCleaner implements Runnable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterCleaner.class);
+
+ private GPGContext gpgContext;
+ private long heartbeatExpirationMillis;
+
+ private SimpleDateFormat localTimeFormat;
+ private SimpleDateFormat utcFormat;
+
+ /**
+ * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner
+ * service to check the membership table and remove sub clusters that have not
+ * sent a heart beat in some amount of time.
+ */
+ public SubClusterCleaner(Configuration conf, GPGContext gpgContext) {
+ this.heartbeatExpirationMillis =
+ conf.getLong(GlobalPolicyGenerator.GPG_SUBCLUSTER_EXPIRATION_MS,
+ GlobalPolicyGenerator.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS);
+ this.gpgContext = gpgContext;
+ this.localTimeFormat = new SimpleDateFormat();
+ this.utcFormat = new SimpleDateFormat();
+ this.utcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}",
+ GPGUtils.getDurationString(this.heartbeatExpirationMillis));
+ }
+
+ @Override
+ public void run() {
+ try {
+ Date now = new Date();
+ // The timestamp of now we will get if the date in statestore is in local
+ // time, rather than UTC time
+ Date nowLocalParsedAsUtc =
+ this.utcFormat.parse(this.localTimeFormat.format(now));
+ LOG.info(
+ "SubClusterCleaner at nowLocalParsedAsUtc time {}, local time {}",
+ nowLocalParsedAsUtc, now);
+
+ Map infoMap = this.gpgContext
+ .getStateStoreFacade().getSubClusters(false, true);
+
+ // Iterate over each sub cluster and check last heartbeat
+ for (Map.Entry entry : infoMap.entrySet()) {
+ SubClusterInfo subClusterInfo = entry.getValue();
+
+ Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat());
+ LOG.info("Checking subcluster {} in state {}, last heartbeat at {}",
+ subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
+ lastHeartBeat);
+
+ if (!subClusterInfo.getState().isUnusable()) {
+ boolean shouldDeregister = false;
+
+ long timeUntilDeregister = this.heartbeatExpirationMillis
+ - (now.getTime() - lastHeartBeat.getTime());
+ if (timeUntilDeregister <= 0) {
+ shouldDeregister = true;
+ /**
+ * An exception for robustness. It is possible that somehow local
+ * time rather than UTC time is put into statestore. So here the
+ * exception is to not deregister the subcluster if the
+ * lastHeartBeat falls into the window of local time parsed as utc.
+ */
+ timeUntilDeregister = this.heartbeatExpirationMillis
+ - (nowLocalParsedAsUtc.getTime() - lastHeartBeat.getTime());
+ if (timeUntilDeregister >= 0
+ && timeUntilDeregister <= this.heartbeatExpirationMillis) {
+ LOG.error(
+ "State store lastHeartBeat is in local time {}"
+ + " for subcluster {}, skipping for robustness",
+ lastHeartBeat.toString(), entry.getKey());
+ shouldDeregister = false;
+ }
+ }
+
+ // Deregister subcluster as SC_LOST if last heartbeat too old
+ if (shouldDeregister) {
+ LOG.warn("Deregistering subcluster {}: last heartbeat at {}",
+ subClusterInfo.getSubClusterId(),
+ new Date(subClusterInfo.getLastHeartBeat()));
+ try {
+ this.gpgContext.getStateStoreFacade().deregisterSubCluster(
+ subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST);
+ } catch (Exception e) {
+ LOG.error("deregisterSubCluster failed on subcluster "
+ + subClusterInfo.getSubClusterId(), e);
+ }
+ } else {
+ LOG.info("Time until deregister for subcluster {}: {}",
+ entry.getKey(),
+ GPGUtils.getDurationString(timeUntilDeregister));
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Subcluster cleaner fails: ", e);
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java
new file mode 100644
index 0000000..1960800
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The sub-cluster cleaner is one of the GPE's services that periodically checks
+ * the membership table and mark subclusters that have not sent a heartbeat in
+ * certain amount of time as LOST.
+ */
+public class SubClusterCleanerService extends AbstractService {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterCleanerService.class);
+
+ private GPGContext gpgContext;
+ private long cleanerIntervalMillis;
+ private SubClusterCleaner subClusterCleaner;
+ private ScheduledExecutorService scheduledExecutorService;
+
+ public SubClusterCleanerService(GPGContext gpgContext) {
+ super(SubClusterCleanerService.class.getName());
+ this.gpgContext = gpgContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ // Load parameters from the configuration
+ this.cleanerIntervalMillis =
+ conf.getLong(GlobalPolicyGenerator.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
+ GlobalPolicyGenerator.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
+ // Set up the runnable
+ this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ // Begin the sub cluster cleaner scheduled execution
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, 0,
+ this.cleanerIntervalMillis, TimeUnit.MILLISECONDS);
+ LOG.info("Started federation sub cluster cleaner with interval: {}",
+ GPGUtils.getDurationString(this.cleanerIntervalMillis));
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // Attempt to stop the sub cluster cleaner scheduled execution
+ try {
+ if (this.scheduledExecutorService != null
+ && !this.scheduledExecutorService.isShutdown()) {
+ this.scheduledExecutorService.shutdown();
+ LOG.info("Stopped federation sub cluster cleaner");
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
+ throw e;
+ }
+ super.serviceStop();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java
new file mode 100644
index 0000000..36df78f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for Sub-cluster Cleaner in GPG.
+ */
+public class TestSubClusterCleaner {
+
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private SubClusterCleaner cleaner;
+ private GPGContext gpgContext;
+
+ private ArrayList subClusterIds;
+
+ @Before
+ public void setup() throws YarnException {
+ conf = new YarnConfiguration();
+
+ // subcluster expires in one second
+ conf.setLong(GlobalPolicyGenerator.GPG_SUBCLUSTER_EXPIRATION_MS, 1000);
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setStateStoreFacade(facade);
+
+ cleaner = new SubClusterCleaner(conf, gpgContext);
+
+ // Create and register six sub clusters
+ subClusterIds = new ArrayList();
+ for (int i = 0; i < 3; i++) {
+ // Create sub cluster id and info
+ SubClusterId subClusterId =
+ SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+ "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4",
+ SubClusterState.SC_RUNNING, System.currentTimeMillis(), "");
+ // Register the sub cluster
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ // Append the id to a local list
+ subClusterIds.add(subClusterId);
+ }
+ }
+
+ @After
+ public void breakDown() throws Exception {
+ stateStore.close();
+ }
+
+ @Test
+ public void testSubClusterRegisterHeartBeatTime() throws YarnException {
+ cleaner.run();
+ Assert.assertEquals(3, facade.getSubClusters(true, true).size());
+ }
+
+ /**
+ * Test the base use case.
+ */
+ @Test
+ public void testSubClusterHeartBeat() throws YarnException {
+ // The first subcluster reports as Unhealthy
+ SubClusterId subClusterId = subClusterIds.get(0);
+ stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity"));
+
+ // The second subcluster didn't heartbeat for two seconds, should mark lost
+ subClusterId = subClusterIds.get(1);
+ stateStore.setSubClusterLastHeartbeat(subClusterId,
+ System.currentTimeMillis() - 2000);
+
+ cleaner.run();
+ Assert.assertEquals(1, facade.getSubClusters(true, true).size());
+ }
+
+ /**
+ * This is the test for extra robustness, do not mark the subcluster as lost
+ * if its heart beat time is stored in local time but parsed as UTC.
+ */
+ @Test
+ public void testSubClusterUsingLocalTimeParsedAsUTC()
+ throws YarnException, ParseException {
+ Date now = new Date();
+ SimpleDateFormat localTimeFormat = new SimpleDateFormat();
+ SimpleDateFormat utcFormat = new SimpleDateFormat();
+ utcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ Date nowLocalParsedAsUtc = utcFormat.parse(localTimeFormat.format(now));
+
+ // The first subcluster is using localTime parsed as UTC, should NOT be
+ // marked as lost
+ SubClusterId subClusterId = subClusterIds.get(0);
+ stateStore.setSubClusterLastHeartbeat(subClusterId,
+ nowLocalParsedAsUtc.getTime());
+
+ // The second subcluster didn't heartbeat for two seconds, should mark lost
+ subClusterId = subClusterIds.get(1);
+ stateStore.setSubClusterLastHeartbeat(subClusterId,
+ System.currentTimeMillis() - 2000);
+
+ // The third subcluster's heartbeat time is in the future, should no be
+ // marked as lost
+ subClusterId = subClusterIds.get(2);
+ stateStore.setSubClusterLastHeartbeat(subClusterId,
+ System.currentTimeMillis() + 1000);
+
+ cleaner.run();
+ Assert.assertEquals(2, facade.getSubClusters(true, true).size());
+ }
+
+}
\ No newline at end of file