diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 7c06256..0895b77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -68,6 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * In-memory implementation of {@link FederationStateStore}. */ @@ -158,6 +160,19 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( return SubClusterHeartbeatResponse.newInstance(); } + @VisibleForTesting + public void setSubClusterLastHeartbeat(SubClusterId subClusterId, + long lastHeartbeat) throws YarnException { + GetSubClusterInfoResponse response = + getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)); + SubClusterInfo subClusterInfo = response.getSubClusterInfo(); + if (subClusterInfo == null) { + throw new YarnException( + "Subcluster " + subClusterId.toString() + " does not exist"); + } + subClusterInfo.setLastHeartBeat(lastHeartbeat); + } + @Override public GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoRequest request) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 682eb14..b99f1e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; @@ -62,9 +63,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,6 +224,22 @@ public static FederationStateStoreFacade getInstance() { } /** + * Deregister a subcluster identified by {@code SubClusterId} to + * change state in federation. This can be done to mark the sub cluster lost, + * deregistered, or decommissioned. + * + * @param subClusterId the target subclusterId + * @param subClusterState the state to update it to + * @throws YarnException if the request is invalid/fails + */ + public void deregisterSubCluster(SubClusterId subClusterId, + SubClusterState subClusterState) throws YarnException { + stateStore.deregisterSubCluster( + SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState)); + return; + } + + /** * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. * * @param subClusterId the identifier of the sub-cluster @@ -255,8 +274,7 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) public SubClusterInfo getSubCluster(final SubClusterId subClusterId, final boolean flushCache) throws YarnException { if (flushCache && isCachingEnabled()) { - LOG.info("Flushing subClusters from cache and rehydrating from store," - + " most likely on account of RM failover."); + LOG.info("Flushing subClusters from cache and rehydrating from store."); cache.remove(buildGetSubClustersCacheRequest(false)); } return getSubCluster(subClusterId); @@ -287,6 +305,26 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId, } /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public Map getSubClusters( + final boolean filterInactiveSubClusters, final boolean flushCache) + throws YarnException { + if (flushCache && isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store."); + cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } + return getSubClusters(filterInactiveSubClusters); + } + + /** * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * * @param queue the queue whose policy is required diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java new file mode 100644 index 0000000..b30a52a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +/** + * GPGUtils contains utility functions for the GPG + * + */ +public final class GPGUtils { + + // hide constructor + private GPGUtils() { + } + + /** + * Converts the given millisecond duration into a formatted string + * + * @param millis + * @return String representation of the millisecond duration formatted like + * XXD:XXH:XXM:XXS:XXXMS + */ + public static String getDurationString(long millis) { + if (millis < 0) { + throw new IllegalArgumentException("Duration must be non-negative!"); + } + + long days = TimeUnit.MILLISECONDS.toDays(millis); + millis -= TimeUnit.DAYS.toMillis(days); + long hours = TimeUnit.MILLISECONDS.toHours(millis); + millis -= TimeUnit.HOURS.toMillis(hours); + long minutes = TimeUnit.MILLISECONDS.toMinutes(millis); + millis -= TimeUnit.MINUTES.toMillis(minutes); + long seconds = TimeUnit.MILLISECONDS.toSeconds(millis); + millis -= TimeUnit.SECONDS.toMillis(seconds); + + StringBuilder sb = new StringBuilder(64); + sb.append(days); + sb.append("D:"); + sb.append(String.format("%02d", hours)); + sb.append("H:"); + sb.append(String.format("%02d", minutes)); + sb.append("M:"); + sb.append(String.format("%02d", seconds)); + sb.append("S:"); + sb.append(String.format("%03d", millis)); + sb.append("MS"); + + return (sb.toString()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index c1f7460..0d2ad7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -46,6 +46,27 @@ public static final Logger LOG = LoggerFactory.getLogger(GlobalPolicyGenerator.class); + public static final String FEDERATION_PREFIX = "yarn.federation."; + + // -------------- Federation GlobalPolicyGenerator configuration -----------// + + private static final String FEDERATION_GPG_PREFIX = + FEDERATION_PREFIX + "globalpolicygenerator."; + + /** + * The interval at which the subcluster cleaner runs, default is a minute + */ + public static final String FEDERATION_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; + public static final long DEFAULT_FEDERATION_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + 60000; + + /** The expiration time for a subcluster heartbeat, default is 30 minutes */ + public static final String FEDERATION_GPG_SUBCLUSTER_HEARTBEAT_EXPIRATION_MS = + FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; + public static final long DEFAULT_FEDERATION_GPG_SUBCLUSTER_HEARTBEAT_EXPIRATION_MS = + 1800000; + // YARN Variables private static CompositeServiceShutdownHook gpgShutdownHook; public static final int SHUTDOWN_HOOK_PRIORITY = 30; @@ -54,6 +75,7 @@ // Federation Variables private GPGContext gpgContext; + private SubClusterCleanerService scCleanerService; public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); @@ -85,6 +107,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.gpgContext .setStateStoreFacade(FederationStateStoreFacade.getInstance()); + this.scCleanerService = new SubClusterCleanerService(this.gpgContext); + addService(this.scCleanerService); + DefaultMetricsSystem.initialize(METRICS_NAME); // super.serviceInit after all services are added diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java new file mode 100644 index 0000000..9e88acd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleaner.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SubClusterCleaner implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(SubClusterCleaner.class); + + private GPGContext gpgContext; + private long heartbeatExpirationMillis; + + private SimpleDateFormat localTimeFormat; + private SimpleDateFormat utcFormat; + + /** + * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner + * service to check the membership table and remove sub clusters that have not + * sent a heart beat in some amount of time. + */ + public SubClusterCleaner(Configuration conf, GPGContext gpgContext) { + this.heartbeatExpirationMillis = conf.getLong( + GlobalPolicyGenerator.FEDERATION_GPG_SUBCLUSTER_HEARTBEAT_EXPIRATION_MS, + GlobalPolicyGenerator.DEFAULT_FEDERATION_GPG_SUBCLUSTER_HEARTBEAT_EXPIRATION_MS); + this.gpgContext = gpgContext; + this.localTimeFormat = new SimpleDateFormat(); + this.utcFormat = new SimpleDateFormat(); + this.utcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}", + GPGUtils.getDurationString(this.heartbeatExpirationMillis)); + } + + @Override + public void run() { + try { + Date now = new Date(); + // The timestamp of now we will get if the date in statestore is in local + // time, rather than UTC time + Date nowLocalParsedAsUtc = + this.utcFormat.parse(this.localTimeFormat.format(now)); + LOG.info( + "Subcluster cleaner run at nowLocalParsedAsUtc time {}, local time {}", + nowLocalParsedAsUtc, now); + + Map infoMap = this.gpgContext + .getStateStoreFacade().getSubClusters(false, true); + + // Iterate over each sub cluster and check last heartbeat + for (Map.Entry entry : infoMap.entrySet()) { + SubClusterInfo subClusterInfo = entry.getValue(); + + Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat()); + LOG.info("Checking subcluster {} in state {}, last heartbeat at {}", + subClusterInfo.getSubClusterId(), subClusterInfo.getState(), + lastHeartBeat); + + if (!subClusterInfo.getState().isUnusable()) { + boolean shouldDeregister = false; + + long timeUntilDeregister = this.heartbeatExpirationMillis + - (now.getTime() - lastHeartBeat.getTime()); + if (timeUntilDeregister <= 0) { + shouldDeregister = true; + /** + * An exception for robustness. It is possible that somehow local + * time rather than UTC time is put into statestore. So here the + * exception is to not deregister the subcluster if the + * lastHeartBeat falls into the window of local time parsed as utc. + */ + timeUntilDeregister = this.heartbeatExpirationMillis + - (nowLocalParsedAsUtc.getTime() - lastHeartBeat.getTime()); + if (timeUntilDeregister >= 0 + && timeUntilDeregister <= this.heartbeatExpirationMillis) { + LOG.error( + "State store lastHeartBeat is in local time {}" + + " for subcluster {}, skipping for robustness", + lastHeartBeat.toString(), entry.getKey()); + shouldDeregister = false; + } + } + + // Deregister subcluster as SC_LOST if last heartbeat too old + if (shouldDeregister) { + LOG.warn("Deregistering subcluster {}: last heartbeat at {}", + subClusterInfo.getSubClusterId(), + new Date(subClusterInfo.getLastHeartBeat())); + try { + this.gpgContext.getStateStoreFacade().deregisterSubCluster( + subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST); + } catch (Exception e) { + LOG.error("deregisterSubCluster failed on subcluster " + + subClusterInfo.getSubClusterId(), e); + } + } else { + LOG.info("Time until deregister for subcluster {}: {}", + entry.getKey(), + GPGUtils.getDurationString(timeUntilDeregister)); + } + } + } + } catch (Throwable e) { + LOG.error("Subcluster cleaner fails: ", e); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java new file mode 100644 index 0000000..86d0422 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/SubClusterCleanerService.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The sub-cluster cleaner is one of the GPE's services that periodically checks + * the membership table and mark subclusters that have not sent a heartbeat in + * certain amount of time as LOST. + */ +public class SubClusterCleanerService extends AbstractService { + + public static final Logger LOG = + LoggerFactory.getLogger(SubClusterCleanerService.class); + + private GPGContext gpgContext; + private long cleanerIntervalMillis; + private SubClusterCleaner subClusterCleaner; + private ScheduledExecutorService scheduledExecutorService; + + public SubClusterCleanerService(GPGContext gpgContext) { + super(SubClusterCleanerService.class.getName()); + this.gpgContext = gpgContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + // Load parameters from the configuration + this.cleanerIntervalMillis = conf.getLong( + GlobalPolicyGenerator.FEDERATION_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, + GlobalPolicyGenerator.DEFAULT_FEDERATION_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); + // Set up the runnable + this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // Begin the sub cluster cleaner scheduled execution + this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, 0, + this.cleanerIntervalMillis, TimeUnit.MILLISECONDS); + LOG.info("Started federation sub cluster cleaner with interval: {}", + GPGUtils.getDurationString(this.cleanerIntervalMillis)); + } + + @Override + protected void serviceStop() throws Exception { + // Attempt to stop the sub cluster cleaner scheduled execution + try { + if (this.scheduledExecutorService != null + && !this.scheduledExecutorService.isShutdown()) { + this.scheduledExecutorService.shutdown(); + LOG.info("Stopped federation sub cluster cleaner"); + } + } catch (Exception e) { + LOG.error("Failed to shutdown ScheduledExecutorService", e); + throw e; + } + super.serviceStop(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java new file mode 100644 index 0000000..6cc96ea --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestSubClusterCleaner.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.TimeZone; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for Sub-cluster Cleaner in GPG. + */ +public class TestSubClusterCleaner { + + Configuration conf; + MemoryFederationStateStore stateStore; + FederationStateStoreFacade facade; + SubClusterCleaner cleaner; + GPGContext gpgContext; + + ArrayList subClusterIds; + + @Before + public void setup() throws YarnException { + conf = new YarnConfiguration(); + + // subcluster expires in one second + conf.setLong( + GlobalPolicyGenerator.FEDERATION_GPG_SUBCLUSTER_HEARTBEAT_EXPIRATION_MS, + 1000); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + cleaner = new SubClusterCleaner(conf, gpgContext); + + // Create and register six sub clusters + subClusterIds = new ArrayList(); + for (int i = 0; i < 3; i++) { + // Create sub cluster id and info + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i)); + + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4", + SubClusterState.SC_RUNNING, System.currentTimeMillis(), ""); + // Register the sub cluster + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + // Append the id to a local list + subClusterIds.add(subClusterId); + } + } + + @After + public void breakDown() throws Exception { + stateStore.close(); + } + + @Test + public void testSubClusterRegisterHeartBeatTime() throws YarnException { + cleaner.run(); + Assert.assertEquals(3, facade.getSubClusters(true, true).size()); + } + + /** + * Test the base use case. + */ + @Test + public void testSubClusterHeartBeat() throws YarnException { + // The first subcluster reports as Unhealthy + SubClusterId subClusterId = subClusterIds.get(0); + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity")); + + // The second subcluster didn't heartbeat for two seconds, should mark lost + subClusterId = subClusterIds.get(1); + stateStore.setSubClusterLastHeartbeat(subClusterId, + System.currentTimeMillis() - 2000); + + cleaner.run(); + Assert.assertEquals(1, facade.getSubClusters(true, true).size()); + } + + /** + * This is the test for extra robustness, do not mark the subcluster as lost + * if its heart beat time is stored in local time but parsed as UTC. + */ + @Test + public void testSubClusterUsingLocalTimeParsedAsUTC() + throws YarnException, ParseException { + Date now = new Date(); + SimpleDateFormat localTimeFormat = new SimpleDateFormat(); + SimpleDateFormat utcFormat = new SimpleDateFormat(); + utcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + Date nowLocalParsedAsUtc = utcFormat.parse(localTimeFormat.format(now)); + + // The first subcluster is using localTime parsed as UTC, should NOT be + // marked as lost + SubClusterId subClusterId = subClusterIds.get(0); + stateStore.setSubClusterLastHeartbeat(subClusterId, + nowLocalParsedAsUtc.getTime()); + + // The second subcluster didn't heartbeat for two seconds, should mark lost + subClusterId = subClusterIds.get(1); + stateStore.setSubClusterLastHeartbeat(subClusterId, + System.currentTimeMillis() - 2000); + + // The third subcluster's heartbeat time is in the future, should no be + // marked as lost + subClusterId = subClusterIds.get(2); + stateStore.setSubClusterLastHeartbeat(subClusterId, + System.currentTimeMillis() + 1000); + + cleaner.run(); + Assert.assertEquals(2, facade.getSubClusters(true, true).size()); + } + +} \ No newline at end of file