diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c4fdb79..afd882a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -151,6 +151,18 @@ public static AllocateResponse newInstance(int responseId,
public abstract void setResponseId(int responseId);
/**
+ * Get the next heartbeat interval.
+ * @return next heartbeat interval
+ */
+ @Public
+ @Stable
+ public abstract long getNextHeartbeatInterval();
+
+ @Private
+ @Unstable
+ public abstract void setNextHeartbeatInterval(long nextHeartbeatInterval);
+
+ /**
* Get the list of newly allocated Container by the
* ResourceManager.
* @return list of newly allocated Container
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 48a75c0..8d4f924 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -338,6 +338,16 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
+ /** Adaptive heartbeat interval */
+ public static final String ADAPTIVE_HEARTBEAT =
+ RM_PREFIX + "adaptive-heartbeat.enable";
+ public static final boolean DEFAULT_ADAPTIVE_HEARTBEAT = false;
+
+ /** RM set next Heartbeat interval for AM */
+ public static final String RM_AM_HEARTBEAT_INTERVAL_MS =
+ RM_PREFIX + "applicationmasters.heartbeat-interval-ms";
+ public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS = 1000;
+
/** RM set next Heartbeat interval for NM */
public static final String RM_NM_HEARTBEAT_INTERVAL_MS =
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 410b663..49708a8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -87,6 +87,7 @@ message AllocateResponseProto {
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12;
+ optional int64 nextHeartbeatInterval = 13;
}
enum SchedulerResourceTypes {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index f2796fd..48cd3ef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -209,6 +209,18 @@ public synchronized void setResponseId(int responseId) {
}
@Override
+ public synchronized long getNextHeartbeatInterval() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getNextHeartbeatInterval());
+ }
+
+ @Override
+ public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) {
+ maybeInitBuilder();
+ builder.setNextHeartbeatInterval(nextHeartbeatInterval);
+ }
+
+ @Override
public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index cd1dacf..3abed59 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -95,8 +96,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.SystemClock;
+
@SuppressWarnings("unchecked")
@Private
@@ -105,6 +109,9 @@
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
+ private boolean adaptiveHeartbeat;
+ private long defaultHeartbeatInterval;
+ private AdaptiveHeartbeatPolicy heartbeatPolicy;
private InetSocketAddress bindAddress;
private Server server;
private final RecordFactory recordFactory =
@@ -112,12 +119,14 @@
private final ConcurrentMap responseMap =
new ConcurrentHashMap();
private final RMContext rmContext;
+ private volatile Clock clock;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.rmContext = rmContext;
+ this.clock = new SystemClock();
}
@Override
@@ -125,6 +134,19 @@ protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS);
+ if (defaultHeartbeatInterval <= 0) {
+ throw new YarnRuntimeException("Invalid Configuration. "
+ + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS
+ + " should be larger than 0.");
+ }
+ heartbeatPolicy = new AdaptiveHeartbeatPolicy();
+
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
@@ -463,6 +485,15 @@ public AllocateResponse allocate(AllocateRequest request)
throw new InvalidApplicationMasterRequestException(message);
}
+ long timestamp = clock.getTime();
+ if (lock.getTimestamp() + lock.getNextHeartbeatInterval() < timestamp) {
+ lock.setTimestamp(timestamp);
+ } else {
+ // TODO: support out-of-band heartbeat?
+ // Now we allow any out-of-band heartbeat.
+ lock.setTimestamp(timestamp);
+ }
+
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
@@ -580,6 +611,12 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+ // Set the adaptive heartbeat interval.
+ long nextHeartbeatInterval =
+ heartbeatPolicy.getNextHeartbeatInterval(appAttemptId);
+ allocateResponse.setNextHeartbeatInterval(nextHeartbeatInterval);
+ lock.setNextHeartbeatInterval(nextHeartbeatInterval);
+
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
@@ -702,22 +739,70 @@ protected void serviceStop() throws Exception {
public static class AllocateResponseLock {
private AllocateResponse response;
-
+ private long timestamp = 0L;
+ private long nextHeartbeatInterval = 0L;
+
public AllocateResponseLock(AllocateResponse response) {
this.response = response;
}
-
+
public synchronized AllocateResponse getAllocateResponse() {
return response;
}
-
+
public synchronized void setAllocateResponse(AllocateResponse response) {
this.response = response;
}
+
+ public synchronized long getTimestamp() {
+ return timestamp;
+ }
+
+ public synchronized void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public synchronized long getNextHeartbeatInterval() {
+ return nextHeartbeatInterval;
+ }
+
+ public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) {
+ this.nextHeartbeatInterval = nextHeartbeatInterval;
+ }
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
+
+ private class AdaptiveHeartbeatPolicy {
+ public long getNextHeartbeatInterval(ApplicationAttemptId appAttemptId) {
+ if (!adaptiveHeartbeat) {
+ return defaultHeartbeatInterval;
+ } else {
+ // TODO: implement policy
+ return defaultHeartbeatInterval;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void initAdaptiveHeartbeat() {
+ Configuration conf = getConfig();
+
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS);
+ if (defaultHeartbeatInterval <= 0) {
+ throw new YarnRuntimeException("Invalid Configuration. "
+ + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS
+ + " should be larger than 0.");
+ }
+
+ this.heartbeatPolicy = new AdaptiveHeartbeatPolicy();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 16b6a89..c44ccc1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -91,7 +91,9 @@
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
- private long nextHeartBeatInterval;
+ private boolean adaptiveHeartbeat;
+ private long defaultHeartbeatInterval;
+ private AdaptiveHeartbeatPolicy heartbeatPolicy;
private Server server;
private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion;
@@ -135,14 +137,18 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
- nextHeartBeatInterval =
+ adaptiveHeartbeat =
+ conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT,
+ YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT);
+ defaultHeartbeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
- if (nextHeartBeatInterval <= 0) {
+ if (defaultHeartbeatInterval <= 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
+ heartbeatPolicy = new AdaptiveHeartbeatPolicy();
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -171,7 +177,7 @@ protected void serviceStart() throws Exception {
this.server =
rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
conf, null,
- conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
+ conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
// Enable service authorization?
@@ -190,9 +196,9 @@ protected void serviceStart() throws Exception {
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
- YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
- server.getListenerAddress());
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ server.getListenerAddress());
}
@Override
@@ -438,10 +444,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
// Heartbeat response
+ long nextHeartbeatInterval = heartbeatPolicy.getNextHeartbeatInterval();
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
- nextHeartBeatInterval);
+ nextHeartbeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
@@ -547,4 +554,15 @@ void refreshServiceAcls(Configuration configuration,
public Server getServer() {
return this.server;
}
+
+ private class AdaptiveHeartbeatPolicy {
+ public long getNextHeartbeatInterval() {
+ if (!adaptiveHeartbeat) {
+ return defaultHeartbeatInterval;
+ } else {
+ // TODO: implement policy
+ return defaultHeartbeatInterval;
+ }
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index ca5c7a4..d0856e8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -68,6 +68,35 @@ public static void setup() {
ResourceScheduler.class);
}
+ @Test(timeout = 50000)
+ public void testGetNextHeartBeatInterval() throws Exception {
+ conf.set(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, "7500");
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ // Init the conf and the adaptiveHeartbeatPolicy
+ rm.getApplicationMasterService().initAdaptiveHeartbeat();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+ // Submit an application
+ RMApp app1 = rm.submitApp(2048);
+
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ am1.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1);
+ AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+ Assert.assertEquals(7500L, alloc1Response.getNextHeartbeatInterval());
+
+ rm.stop();
+ }
+
@Test(timeout = 3000000)
public void testRMIdentifierOnContainerAllocation() throws Exception {
MockRM rm = new MockRM(conf);