diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index c4fdb79..afd882a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -151,6 +151,18 @@ public static AllocateResponse newInstance(int responseId, public abstract void setResponseId(int responseId); /** + * Get the next heartbeat interval. + * @return next heartbeat interval + */ + @Public + @Stable + public abstract long getNextHeartbeatInterval(); + + @Private + @Unstable + public abstract void setNextHeartbeatInterval(long nextHeartbeatInterval); + + /** * Get the list of newly allocated Container by the * ResourceManager. * @return list of newly allocated Container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 48a75c0..8d4f924 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -338,6 +338,16 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; + /** Adaptive heartbeat interval */ + public static final String ADAPTIVE_HEARTBEAT = + RM_PREFIX + "adaptive-heartbeat.enable"; + public static final boolean DEFAULT_ADAPTIVE_HEARTBEAT = false; + + /** RM set next Heartbeat interval for AM */ + public static final String RM_AM_HEARTBEAT_INTERVAL_MS = + RM_PREFIX + "applicationmasters.heartbeat-interval-ms"; + public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS = 1000; + /** RM set next Heartbeat interval for NM */ public static final String RM_NM_HEARTBEAT_INTERVAL_MS = RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 410b663..49708a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -87,6 +87,7 @@ message AllocateResponseProto { repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; + optional int64 nextHeartbeatInterval = 13; } enum SchedulerResourceTypes { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index f2796fd..48cd3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -209,6 +209,18 @@ public synchronized void setResponseId(int responseId) { } @Override + public synchronized long getNextHeartbeatInterval() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNextHeartbeatInterval()); + } + + @Override + public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) { + maybeInitBuilder(); + builder.setNextHeartbeatInterval(nextHeartbeatInterval); + } + + @Override public synchronized Resource getAvailableResources() { if (this.limit != null) { return this.limit; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index cd1dacf..3abed59 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -95,8 +96,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.SystemClock; + @SuppressWarnings("unchecked") @Private @@ -105,6 +109,9 @@ private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; + private boolean adaptiveHeartbeat; + private long defaultHeartbeatInterval; + private AdaptiveHeartbeatPolicy heartbeatPolicy; private InetSocketAddress bindAddress; private Server server; private final RecordFactory recordFactory = @@ -112,12 +119,14 @@ private final ConcurrentMap responseMap = new ConcurrentHashMap(); private final RMContext rmContext; + private volatile Clock clock; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.rmContext = rmContext; + this.clock = new SystemClock(); } @Override @@ -125,6 +134,19 @@ protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS); + if (defaultHeartbeatInterval <= 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS + + " should be larger than 0."); + } + heartbeatPolicy = new AdaptiveHeartbeatPolicy(); + InetSocketAddress masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, @@ -463,6 +485,15 @@ public AllocateResponse allocate(AllocateRequest request) throw new InvalidApplicationMasterRequestException(message); } + long timestamp = clock.getTime(); + if (lock.getTimestamp() + lock.getNextHeartbeatInterval() < timestamp) { + lock.setTimestamp(timestamp); + } else { + // TODO: support out-of-band heartbeat? + // Now we allow any out-of-band heartbeat. + lock.setTimestamp(timestamp); + } + //filter illegal progress values float filteredProgress = request.getProgress(); if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY @@ -580,6 +611,12 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + // Set the adaptive heartbeat interval. + long nextHeartbeatInterval = + heartbeatPolicy.getNextHeartbeatInterval(appAttemptId); + allocateResponse.setNextHeartbeatInterval(nextHeartbeatInterval); + lock.setNextHeartbeatInterval(nextHeartbeatInterval); + // add preemption to the allocateResponse message (if any) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); @@ -702,22 +739,70 @@ protected void serviceStop() throws Exception { public static class AllocateResponseLock { private AllocateResponse response; - + private long timestamp = 0L; + private long nextHeartbeatInterval = 0L; + public AllocateResponseLock(AllocateResponse response) { this.response = response; } - + public synchronized AllocateResponse getAllocateResponse() { return response; } - + public synchronized void setAllocateResponse(AllocateResponse response) { this.response = response; } + + public synchronized long getTimestamp() { + return timestamp; + } + + public synchronized void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public synchronized long getNextHeartbeatInterval() { + return nextHeartbeatInterval; + } + + public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) { + this.nextHeartbeatInterval = nextHeartbeatInterval; + } } @VisibleForTesting public Server getServer() { return this.server; } + + private class AdaptiveHeartbeatPolicy { + public long getNextHeartbeatInterval(ApplicationAttemptId appAttemptId) { + if (!adaptiveHeartbeat) { + return defaultHeartbeatInterval; + } else { + // TODO: implement policy + return defaultHeartbeatInterval; + } + } + } + + @VisibleForTesting + public void initAdaptiveHeartbeat() { + Configuration conf = getConfig(); + + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS); + if (defaultHeartbeatInterval <= 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS + + " should be larger than 0."); + } + + this.heartbeatPolicy = new AdaptiveHeartbeatPolicy(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 16b6a89..c44ccc1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -91,7 +91,9 @@ private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; - private long nextHeartBeatInterval; + private boolean adaptiveHeartbeat; + private long defaultHeartbeatInterval; + private AdaptiveHeartbeatPolicy heartbeatPolicy; private Server server; private InetSocketAddress resourceTrackerAddress; private String minimumNodeManagerVersion; @@ -135,14 +137,18 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); RackResolver.init(conf); - nextHeartBeatInterval = + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); - if (nextHeartBeatInterval <= 0) { + if (defaultHeartbeatInterval <= 0) { throw new YarnRuntimeException("Invalid Configuration. " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + " should be larger than 0."); } + heartbeatPolicy = new AdaptiveHeartbeatPolicy(); minAllocMb = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -171,7 +177,7 @@ protected void serviceStart() throws Exception { this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, conf, null, - conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, + conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); // Enable service authorization? @@ -190,9 +196,9 @@ protected void serviceStart() throws Exception { this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - server.getListenerAddress()); + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + server.getListenerAddress()); } @Override @@ -438,10 +444,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response + long nextHeartbeatInterval = heartbeatPolicy.getNextHeartbeatInterval(); NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); + nextHeartbeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -547,4 +554,15 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private class AdaptiveHeartbeatPolicy { + public long getNextHeartbeatInterval() { + if (!adaptiveHeartbeat) { + return defaultHeartbeatInterval; + } else { + // TODO: implement policy + return defaultHeartbeatInterval; + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index ca5c7a4..d0856e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -68,6 +68,35 @@ public static void setup() { ResourceScheduler.class); } + @Test(timeout = 50000) + public void testGetNextHeartBeatInterval() throws Exception { + conf.set(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, "7500"); + MockRM rm = new MockRM(conf); + rm.start(); + + // Init the conf and the adaptiveHeartbeatPolicy + rm.getApplicationMasterService().initAdaptiveHeartbeat(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + Assert.assertEquals(7500L, alloc1Response.getNextHeartbeatInterval()); + + rm.stop(); + } + @Test(timeout = 3000000) public void testRMIdentifierOnContainerAllocation() throws Exception { MockRM rm = new MockRM(conf);