diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c4fdb79..afd882a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -151,6 +151,18 @@ public static AllocateResponse newInstance(int responseId,
public abstract void setResponseId(int responseId);
/**
+ * Get the next heartbeat interval.
+ * @return next heartbeat interval
+ */
+ @Public
+ @Stable
+ public abstract long getNextHeartbeatInterval();
+
+ @Private
+ @Unstable
+ public abstract void setNextHeartbeatInterval(long nextHeartbeatInterval);
+
+ /**
* Get the list of newly allocated Container by the
* ResourceManager.
* @return list of newly allocated Container
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 48a75c0..467a61c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -338,11 +338,51 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
- /** RM set next Heartbeat interval for NM */
+ /** Adaptive heartbeat interval */
+ public static final String ADAPTIVE_HEARTBEAT =
+ RM_PREFIX + "adaptive-heartbeat.enable";
+ public static final boolean DEFAULT_ADAPTIVE_HEARTBEAT = false;
+
+ /** RM set default next Heartbeat interval for AM */
+ public static final String RM_AM_HEARTBEAT_INTERVAL_MS =
+ RM_PREFIX + "applicationmasters.heartbeat-interval-ms";
+ public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS = 1000;
+
+ /** Heartbeat delay ratio for AM */
+ public static final String RM_AM_HEARTBEAT_DELAY_RATIO =
+ RM_PREFIX + "applicationmasters.heartbeat-delay-ratio";
+ public static final float DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO = 0.5f;
+
+ /** The 'Gini Coefficient' of heartbeat delay among AMs */
+ public static final String RM_AM_HEARTBEAT_DELAY_GINI =
+ RM_PREFIX + "applicationmasters.heartbeat-delay-gini";
+ public static final float DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI = 2.0f;
+
+ /** The upper limit of the AM heratbeat interval */
+ public static final String RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS =
+ RM_PREFIX + "applicationmasters.heartbeat-upper-limit";
+ public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = 10000;
+
+ /** RM set default next Heartbeat interval for NM */
public static final String RM_NM_HEARTBEAT_INTERVAL_MS =
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
+ /** Heartbeat delay ratio for NM */
+ public static final String RM_NM_HEARTBEAT_DELAY_RATIO =
+ RM_PREFIX + "nodemanagers.heartbeat-delay-ratio";
+ public static final float DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO = 1.0f;
+
+ /** The 'Gini Coefficient' of heartbeat delay among NMs */
+ public static final String RM_NM_HEARTBEAT_DELAY_GINI =
+ RM_PREFIX + "nodemanagers.heartbeat-delay-gini";
+ public static final float DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI = 2.0f;
+
+ /** The upper limit of the NM heratbeat interval */
+ public static final String RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS =
+ RM_PREFIX + "nodemanagers.heartbeat-upper-limit";
+ public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = 30000;
+
/** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 410b663..49708a8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -87,6 +87,7 @@ message AllocateResponseProto {
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12;
+ optional int64 nextHeartbeatInterval = 13;
}
enum SchedulerResourceTypes {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index f2796fd..48cd3ef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -209,6 +209,18 @@ public synchronized void setResponseId(int responseId) {
}
@Override
+ public synchronized long getNextHeartbeatInterval() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getNextHeartbeatInterval());
+ }
+
+ @Override
+ public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) {
+ maybeInitBuilder();
+ builder.setNextHeartbeatInterval(nextHeartbeatInterval);
+ }
+
+ @Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index cd1dacf..2a7ea33 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -92,11 +93,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.SystemClock;
+
@SuppressWarnings("unchecked")
@Private
@@ -105,6 +110,11 @@
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
+ private boolean adaptiveHeartbeat;
+ private long defaultHeartbeatInterval;
+ private long heartbeatIntervalUpperLimit;
+ private float delayRatio;
+ private AdaptiveHeartbeatPolicy heartbeatPolicy;
private InetSocketAddress bindAddress;
private Server server;
private final RecordFactory recordFactory =
@@ -112,12 +122,14 @@
private final ConcurrentMap responseMap =
new ConcurrentHashMap();
private final RMContext rmContext;
+ private volatile Clock clock;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
+ this.clock = new SystemClock();
}
@Override
@@ -125,6 +137,40 @@ protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS);
+ if (defaultHeartbeatInterval <= 0) {
+ defaultHeartbeatInterval =
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS;
+ LOG.warn("Invalid Configuration. "
+ + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS
+ + " should be larger than 0. Now use the default value.");
+ }
+ heartbeatIntervalUpperLimit =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS);
+ if (heartbeatIntervalUpperLimit <= 0) {
+ heartbeatIntervalUpperLimit =
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS;
+ LOG.warn("Invalid Configuration. "
+ + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS
+ + " should be larger than 0. Now use the default value.");
+ }
+ delayRatio =
+ conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO);
+ if (delayRatio < 0) {
+ delayRatio = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO;
+ LOG.warn("Invalid Configuration. "
+ + YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO
+ + " should be 0 or positive. Now use the default value.");
+ }
+ heartbeatPolicy = new AdaptiveHeartbeatPolicy();
+
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
@@ -463,6 +509,15 @@ public AllocateResponse allocate(AllocateRequest request)
throw new InvalidApplicationMasterRequestException(message);
}
+ long timestamp = clock.getTime();
+ if (lock.getTimestamp() + lock.getNextHeartbeatInterval() < timestamp) {
+ lock.setTimestamp(timestamp);
+ } else {
+ // TODO: support out-of-band heartbeat?
+ // Now we allow any out-of-band heartbeat.
+ lock.setTimestamp(timestamp);
+ }
+
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
@@ -580,6 +635,12 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+ // Set the adaptive heartbeat interval.
+ long nextHeartbeatInterval =
+ heartbeatPolicy.getNextHeartbeatInterval(appAttemptId);
+ allocateResponse.setNextHeartbeatInterval(nextHeartbeatInterval);
+ lock.setNextHeartbeatInterval(nextHeartbeatInterval);
+
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
@@ -702,22 +763,74 @@ protected void serviceStop() throws Exception {
public static class AllocateResponseLock {
private AllocateResponse response;
-
+ private long timestamp = 0L;
+ private long nextHeartbeatInterval = 0L;
+
public AllocateResponseLock(AllocateResponse response) {
this.response = response;
}
-
+
public synchronized AllocateResponse getAllocateResponse() {
return response;
}
-
+
public synchronized void setAllocateResponse(AllocateResponse response) {
this.response = response;
}
+
+ public synchronized long getTimestamp() {
+ return timestamp;
+ }
+
+ public synchronized void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public synchronized long getNextHeartbeatInterval() {
+ return nextHeartbeatInterval;
+ }
+
+ public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) {
+ this.nextHeartbeatInterval = nextHeartbeatInterval;
+ }
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
+
+ private class AdaptiveHeartbeatPolicy {
+ public long getNextHeartbeatInterval(ApplicationAttemptId appAttemptId) {
+ if (!adaptiveHeartbeat) {
+ return defaultHeartbeatInterval;
+ } else {
+ float standardDelay =
+ SchedulerMetrics.getMetrics().getNumWaitingEvents() * delayRatio;
+ long interval = defaultHeartbeatInterval + (long) (standardDelay
+ * rScheduler.getHeartbeatDelayRescale(appAttemptId));
+ return interval < heartbeatIntervalUpperLimit
+ ? interval : heartbeatIntervalUpperLimit;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void initAdaptiveHeartbeat() {
+ Configuration conf = getConfig();
+
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS);
+ heartbeatIntervalUpperLimit =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS);
+ delayRatio =
+ conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO);
+ this.heartbeatPolicy = new AdaptiveHeartbeatPolicy();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index c209873..b719333 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -675,6 +675,10 @@ public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
+ SchedulerMetrics.getMetrics().decrNumWaitingEvents();
+ if (event.getType().equals(SchedulerEventType.NODE_UPDATE)) {
+ SchedulerMetrics.getMetrics().decrNumWaitingNodeUpdateEvents();
+ }
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
@@ -727,6 +731,10 @@ public void handle(SchedulerEvent event) {
+ remCapacity);
}
this.eventQueue.put(event);
+ SchedulerMetrics.getMetrics().incrNumWaitingEvents();
+ if (event.getType().equals(SchedulerEventType.NODE_UPDATE)) {
+ SchedulerMetrics.getMetrics().incrNumWaitingNodeUpdateEvents();
+ }
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 16b6a89..fac001c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -91,7 +92,11 @@
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
- private long nextHeartBeatInterval;
+ private boolean adaptiveHeartbeat;
+ private long defaultHeartbeatInterval;
+ private long heartbeatIntervalUpperLimit;
+ private float delayRatio;
+ private AdaptiveHeartbeatPolicy heartbeatPolicy;
private Server server;
private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion;
@@ -135,14 +140,39 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
- nextHeartBeatInterval =
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
- if (nextHeartBeatInterval <= 0) {
- throw new YarnRuntimeException("Invalid Configuration. "
+ if (defaultHeartbeatInterval <= 0) {
+ defaultHeartbeatInterval =
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
+ LOG.warn("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
- + " should be larger than 0.");
+ + " should be larger than 0. Now use the default value.");
}
+ heartbeatIntervalUpperLimit =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS);
+ if (heartbeatIntervalUpperLimit <= 0) {
+ heartbeatIntervalUpperLimit =
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS;
+ LOG.warn("Invalid Configuration. "
+ + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS
+ + " should be larger than 0. Now use the default value.");
+ }
+ delayRatio =
+ conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_RATIO,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO);
+ if (delayRatio < 0) {
+ delayRatio = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO;
+ LOG.warn("Invalid Configuration. "
+ + YarnConfiguration.RM_NM_HEARTBEAT_DELAY_RATIO
+ + " should be 0 or positive. Now use the default value.");
+ }
+ heartbeatPolicy = new AdaptiveHeartbeatPolicy();
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -171,7 +201,7 @@ protected void serviceStart() throws Exception {
this.server =
rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
conf, null,
- conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
+ conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
// Enable service authorization?
@@ -190,9 +220,9 @@ protected void serviceStart() throws Exception {
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
- YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
- server.getListenerAddress());
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ server.getListenerAddress());
}
@Override
@@ -438,10 +468,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
// Heartbeat response
+ long nextHeartbeatInterval = heartbeatPolicy.getNextHeartbeatInterval(nodeId);
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
- nextHeartBeatInterval);
+ nextHeartbeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
@@ -547,4 +578,19 @@ void refreshServiceAcls(Configuration configuration,
public Server getServer() {
return this.server;
}
+
+ private class AdaptiveHeartbeatPolicy {
+ public long getNextHeartbeatInterval(NodeId nodeId) {
+ if (!adaptiveHeartbeat) {
+ return defaultHeartbeatInterval;
+ } else {
+ float standardDelay =
+ SchedulerMetrics.getMetrics().getNumWaitingNodeUpdateEvents() * delayRatio;
+ long interval = defaultHeartbeatInterval + (long) (standardDelay *
+ rmContext.getScheduler().getHeartbeatDelayRescale(nodeId));
+ return interval < heartbeatIntervalUpperLimit
+ ? interval : heartbeatIntervalUpperLimit;
+ }
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java
new file mode 100644
index 0000000..b275387
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class SchedulerMetrics {
+
+ private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ @Metric("# of waiting scheduler events")
+ MutableGaugeInt numWaitingEvents;
+ @Metric("# of waiting node update events")
+ MutableGaugeInt numWaitingNodeUpdateEvents;
+
+ private static final MetricsInfo RECORD_INFO =
+ info("SchedulerMetrics", "Metrics for the Yarn Scheduler");
+ private static volatile SchedulerMetrics INSTANCE = null;
+ private static MetricsRegistry registry;
+
+ // SchedulerMetrics should be a singleton.
+ private SchedulerMetrics() {};
+
+ public static SchedulerMetrics getMetrics() {
+ if(!isInitialized.get()){
+ synchronized (SchedulerMetrics.class) {
+ if(INSTANCE == null){
+ INSTANCE = new SchedulerMetrics();
+ registerMetrics();
+ isInitialized.set(true);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ private static void registerMetrics() {
+ registry = new MetricsRegistry(RECORD_INFO);
+ registry.tag(RECORD_INFO, "ResourceManager");
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ if (ms != null) {
+ ms.register("SchedulerMetrics", "Metrics for the Yarn Scheduler",
+ INSTANCE);
+ }
+ }
+
+ // Waiting scheduler events
+ public int getNumWaitingEvents() {
+ return numWaitingEvents.value();
+ }
+
+ public void incrNumWaitingEvents() {
+ numWaitingEvents.incr();
+ }
+
+ public void decrNumWaitingEvents() {
+ numWaitingEvents.decr();
+ }
+
+ public void setNumWaitingEvents(int num) {
+ numWaitingEvents.set(num);
+ }
+
+ public int getNumWaitingNodeUpdateEvents() {
+ return numWaitingNodeUpdateEvents.value();
+ }
+
+ public void decrNumWaitingNodeUpdateEvents() {
+ numWaitingNodeUpdateEvents.decr();
+ }
+
+ public void incrNumWaitingNodeUpdateEvents() {
+ numWaitingNodeUpdateEvents.incr();
+ }
+
+ public void setNumWaitingNodeUpdateEvents(int num) {
+ numWaitingNodeUpdateEvents.set(num);
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index b99b217..5b3512c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -286,4 +286,14 @@ void setEntitlement(String queue, QueueEntitlement entitlement)
* @return an EnumSet containing the resource types
*/
public EnumSet getSchedulingResourceTypes();
+
+ /**
+ * Return a suggested heartbeat delay scale, 1.0 if respect standard delay.
+ * @param obj to which attempt the interval is set,
+ * support {@link ApplicationAttemptId} and {@link NodeId}.
+ * @return the value of the heartbeat interval
+ */
+ @LimitedPrivate("yarn")
+ @Unstable
+ float getHeartbeatDelayRescale(Object obj);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 48c7f2f..0f568bf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.util.SampleStat;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
@@ -136,6 +137,12 @@
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
+ private SampleStat[] amHeartbeatRescaleFactors = new SampleStat[1];
+ private SampleStat amHeartbeatRescale = new SampleStat();
+ private SampleStat nodeHeartbeatRescale = new SampleStat();
+ private float amMaxScale;
+ private float nodeMaxScale;
+
static final Comparator nonPartitionedQueueComparator =
new Comparator() {
@Override
@@ -325,6 +332,21 @@ private synchronized void initScheduler(Configuration configuration) throws
asyncSchedulerThread = new AsyncScheduleThread(this);
}
+ amMaxScale = conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI);
+ if (amMaxScale < 1) {
+ amMaxScale = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI;
+ LOG.warn("Deployed YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI should" +
+ " larger than 1, now use the default value.");
+ }
+ nodeMaxScale = conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_GINI,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI);
+ if (nodeMaxScale < 1) {
+ nodeMaxScale = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI;
+ LOG.warn("Deployed YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI" +
+ " should larger than 1, now use the default value.");
+ }
+
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
@@ -1781,4 +1803,45 @@ public SchedulerHealth getSchedulerHealth() {
private synchronized void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
+
+ @Override
+ public float getHeartbeatDelayRescale(Object obj) {
+ if (obj instanceof ApplicationAttemptId) {
+ FiCaSchedulerApp appAttempt = getApplicationAttempt((ApplicationAttemptId) obj);
+ if (appAttempt != null) {
+ float capacity =
+ appAttempt.getQueue().getQueueInfo(false, false).getCapacity();
+ amHeartbeatRescaleFactors[0].add(capacity);
+ float reScale =
+ normalizeScale(amHeartbeatRescaleFactors[0], capacity, amMaxScale);
+ amHeartbeatRescale.add(reScale);
+ return normalizeScale(amHeartbeatRescale, reScale, amMaxScale);
+ }
+ } else if (obj instanceof NodeId) {
+ // [need more thinking] We may not discriminate nodes as what we do for apps,
+ // because nodes are not as variant as apps, and it's hard to determine the next delay
+ // of a node heartbeat by any expectation or criterion. However, we random the scale
+ // in a range to avoid clustered heartbeat around particular time intervals.
+ double rand = Math.random();
+ return (float) ((rand < 0.5) ? (1 / nodeMaxScale + (1 - 1 / nodeMaxScale) * rand / 0.5)
+ : (1 + (nodeMaxScale - 1) * (rand - 0.5) / 0.5));
+ } else {
+ // it should not arrive here.
+ }
+ return 1.0f;
+ }
+
+ private float normalizeScale(SampleStat stat, float value, float maxScale) {
+ double min = stat.min();
+ double max = stat.max();
+ double mean = stat.mean();
+ // If value is between min and mean, we map it to interval [1/amMaxScale, 1],
+ // and if value is between mean and max, we map it to interval [1, amMaxScale].
+ if (value < mean) {
+ return (float) (1 / maxScale + (1 - 1 / maxScale)
+ * (value - min) / (mean - min));
+ } else {
+ return (float) (1 + (maxScale - 1) * (value - mean) / (max - mean));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f481de5..7417039 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.util.SampleStat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -49,6 +50,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -185,6 +187,12 @@
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
+ private SampleStat[] amHeartbeatRescaleFactors = new SampleStat[3];
+ private SampleStat amHeartbeatRescale = new SampleStat();
+ private SampleStat nodeHeartbeatRescale = new SampleStat();
+ private float amMaxScale;
+ private float nodeMaxScale;
+
@VisibleForTesting
final MaxRunningAppsEnforcer maxRunningEnforcer;
@@ -1325,6 +1333,21 @@ private void initScheduler(Configuration conf) throws IOException {
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
+ amMaxScale = conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI);
+ if (amMaxScale < 1) {
+ amMaxScale = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI;
+ LOG.warn("Deployed YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI should" +
+ " larger than 1, now use the default value.");
+ }
+ nodeMaxScale = conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_GINI,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI);
+ if (nodeMaxScale < 1) {
+ nodeMaxScale = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI;
+ LOG.warn("Deployed YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI" +
+ " should larger than 1, now use the default value.");
+ }
+
updateInterval = this.conf.getUpdateInterval();
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
@@ -1689,4 +1712,57 @@ private String handleMoveToPlanQueue(String targetQueueName) {
}
return targetQueueName;
}
+
+ @Override
+ public float getHeartbeatDelayRescale(Object obj) {
+ if (obj instanceof ApplicationAttemptId) {
+ FSAppAttempt appAttempt = getSchedulerApp((ApplicationAttemptId) obj);
+ if (appAttempt != null) {
+ int priority = appAttempt.getPriority().getPriority()
+ * appAttempt.getQueue().getPriority().getPriority();
+ amHeartbeatRescaleFactors[0].add(priority);
+ float weight = appAttempt.getWeights().getWeight(ResourceType.MEMORY)
+ * appAttempt.getQueue().getWeights().getWeight(ResourceType.MEMORY);
+ amHeartbeatRescaleFactors[1].add(weight);
+ Resource starvation =
+ Resources.min(RESOURCE_CALCULATOR, clusterResource,
+ Resources.subtract(appAttempt.getFairShare(),
+ appAttempt.getCurrentConsumption()),
+ appAttempt.getAppAttemptResourceUsage().getPending());
+ amHeartbeatRescaleFactors[2].add(starvation.getMemory());
+
+ float reScale =
+ normalizeScale(amHeartbeatRescaleFactors[0], priority, amMaxScale)
+ / normalizeScale(amHeartbeatRescaleFactors[1], weight, amMaxScale)
+ / normalizeScale(amHeartbeatRescaleFactors[2], starvation.getMemory(), amMaxScale);
+ amHeartbeatRescale.add(reScale);
+ return normalizeScale(amHeartbeatRescale, reScale, amMaxScale);
+ }
+ } else if (obj instanceof NodeId) {
+ // [need more thinking] We may not discriminate nodes as what we do for apps,
+ // because nodes are not as variant as apps, and it's hard to determine the next delay
+ // of a node heartbeat by any expectation or criterion. However, we random the scale
+ // in a range to avoid clustered heartbeat around particular time intervals.
+ double rand = Math.random();
+ return (float) ((rand < 0.5) ? (1 / nodeMaxScale + (1 - 1 / nodeMaxScale) * rand / 0.5)
+ : (1 + (nodeMaxScale - 1) * (rand - 0.5) / 0.5));
+ } else {
+ // it should not arrive here.
+ }
+ return 1.0f;
+ }
+
+ private float normalizeScale(SampleStat stat, float value, float maxScale) {
+ double min = stat.min();
+ double max = stat.max();
+ double mean = stat.mean();
+ // If value is between min and mean, we map it to interval [1/amMaxScale, 1],
+ // and if value is between mean and max, we map it to interval [1, amMaxScale].
+ if (value < mean) {
+ return (float) (1 / maxScale + (1 - 1 / maxScale)
+ * (value - min) / (mean - min));
+ } else {
+ return (float) (1 + (maxScale - 1) * (value - mean) / (max - mean));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index b8c419c..e542d89 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -983,4 +983,9 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
public Resource getUsedResource() {
return usedResource;
}
+
+ @Override
+ public float getHeartbeatDelayRescale(Object obj) {
+ return 1.0f;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index ca5c7a4..d0856e8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -68,6 +68,35 @@ public static void setup() {
ResourceScheduler.class);
}
+ @Test(timeout = 50000)
+ public void testGetNextHeartBeatInterval() throws Exception {
+ conf.set(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, "7500");
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ // Init the conf and the adaptiveHeartbeatPolicy
+ rm.getApplicationMasterService().initAdaptiveHeartbeat();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+ // Submit an application
+ RMApp app1 = rm.submitApp(2048);
+
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ am1.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1);
+ AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+ Assert.assertEquals(7500L, alloc1Response.getNextHeartbeatInterval());
+
+ rm.stop();
+ }
+
@Test(timeout = 3000000)
public void testRMIdentifierOnContainerAllocation() throws Exception {
MockRM rm = new MockRM(conf);