diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index c4fdb79..afd882a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -151,6 +151,18 @@ public static AllocateResponse newInstance(int responseId, public abstract void setResponseId(int responseId); /** + * Get the next heartbeat interval. + * @return next heartbeat interval + */ + @Public + @Stable + public abstract long getNextHeartbeatInterval(); + + @Private + @Unstable + public abstract void setNextHeartbeatInterval(long nextHeartbeatInterval); + + /** * Get the list of newly allocated Container by the * ResourceManager. * @return list of newly allocated Container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 48a75c0..467a61c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -338,11 +338,51 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; - /** RM set next Heartbeat interval for NM */ + /** Adaptive heartbeat interval */ + public static final String ADAPTIVE_HEARTBEAT = + RM_PREFIX + "adaptive-heartbeat.enable"; + public static final boolean DEFAULT_ADAPTIVE_HEARTBEAT = false; + + /** RM set default next Heartbeat interval for AM */ + public static final String RM_AM_HEARTBEAT_INTERVAL_MS = + RM_PREFIX + "applicationmasters.heartbeat-interval-ms"; + public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS = 1000; + + /** Heartbeat delay ratio for AM */ + public static final String RM_AM_HEARTBEAT_DELAY_RATIO = + RM_PREFIX + "applicationmasters.heartbeat-delay-ratio"; + public static final float DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO = 0.5f; + + /** The 'Gini Coefficient' of heartbeat delay among AMs */ + public static final String RM_AM_HEARTBEAT_DELAY_GINI = + RM_PREFIX + "applicationmasters.heartbeat-delay-gini"; + public static final float DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI = 2.0f; + + /** The upper limit of the AM heratbeat interval */ + public static final String RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = + RM_PREFIX + "applicationmasters.heartbeat-upper-limit"; + public static final long DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = 10000; + + /** RM set default next Heartbeat interval for NM */ public static final String RM_NM_HEARTBEAT_INTERVAL_MS = RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000; + /** Heartbeat delay ratio for NM */ + public static final String RM_NM_HEARTBEAT_DELAY_RATIO = + RM_PREFIX + "nodemanagers.heartbeat-delay-ratio"; + public static final float DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO = 1.0f; + + /** The 'Gini Coefficient' of heartbeat delay among NMs */ + public static final String RM_NM_HEARTBEAT_DELAY_GINI = + RM_PREFIX + "nodemanagers.heartbeat-delay-gini"; + public static final float DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI = 2.0f; + + /** The upper limit of the NM heratbeat interval */ + public static final String RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = + RM_PREFIX + "nodemanagers.heartbeat-upper-limit"; + public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS = 30000; + /** Number of worker threads that write the history data. */ public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 410b663..49708a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -87,6 +87,7 @@ message AllocateResponseProto { repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; + optional int64 nextHeartbeatInterval = 13; } enum SchedulerResourceTypes { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index f2796fd..48cd3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -209,6 +209,18 @@ public synchronized void setResponseId(int responseId) { } @Override + public synchronized long getNextHeartbeatInterval() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNextHeartbeatInterval()); + } + + @Override + public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) { + maybeInitBuilder(); + builder.setNextHeartbeatInterval(nextHeartbeatInterval); + } + + @Override public synchronized Resource getAvailableResources() { if (this.limit != null) { return this.limit; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index cd1dacf..2a7ea33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -92,11 +93,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.SystemClock; + @SuppressWarnings("unchecked") @Private @@ -105,6 +110,11 @@ private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; + private boolean adaptiveHeartbeat; + private long defaultHeartbeatInterval; + private long heartbeatIntervalUpperLimit; + private float delayRatio; + private AdaptiveHeartbeatPolicy heartbeatPolicy; private InetSocketAddress bindAddress; private Server server; private final RecordFactory recordFactory = @@ -112,12 +122,14 @@ private final ConcurrentMap responseMap = new ConcurrentHashMap(); private final RMContext rmContext; + private volatile Clock clock; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.rmContext = rmContext; + this.clock = new SystemClock(); } @Override @@ -125,6 +137,40 @@ protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS); + if (defaultHeartbeatInterval <= 0) { + defaultHeartbeatInterval = + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS; + LOG.warn("Invalid Configuration. " + + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS + + " should be larger than 0. Now use the default value."); + } + heartbeatIntervalUpperLimit = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS); + if (heartbeatIntervalUpperLimit <= 0) { + heartbeatIntervalUpperLimit = + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS; + LOG.warn("Invalid Configuration. " + + YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS + + " should be larger than 0. Now use the default value."); + } + delayRatio = + conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO); + if (delayRatio < 0) { + delayRatio = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO; + LOG.warn("Invalid Configuration. " + + YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO + + " should be 0 or positive. Now use the default value."); + } + heartbeatPolicy = new AdaptiveHeartbeatPolicy(); + InetSocketAddress masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, @@ -463,6 +509,15 @@ public AllocateResponse allocate(AllocateRequest request) throw new InvalidApplicationMasterRequestException(message); } + long timestamp = clock.getTime(); + if (lock.getTimestamp() + lock.getNextHeartbeatInterval() < timestamp) { + lock.setTimestamp(timestamp); + } else { + // TODO: support out-of-band heartbeat? + // Now we allow any out-of-band heartbeat. + lock.setTimestamp(timestamp); + } + //filter illegal progress values float filteredProgress = request.getProgress(); if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY @@ -580,6 +635,12 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + // Set the adaptive heartbeat interval. + long nextHeartbeatInterval = + heartbeatPolicy.getNextHeartbeatInterval(appAttemptId); + allocateResponse.setNextHeartbeatInterval(nextHeartbeatInterval); + lock.setNextHeartbeatInterval(nextHeartbeatInterval); + // add preemption to the allocateResponse message (if any) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); @@ -702,22 +763,74 @@ protected void serviceStop() throws Exception { public static class AllocateResponseLock { private AllocateResponse response; - + private long timestamp = 0L; + private long nextHeartbeatInterval = 0L; + public AllocateResponseLock(AllocateResponse response) { this.response = response; } - + public synchronized AllocateResponse getAllocateResponse() { return response; } - + public synchronized void setAllocateResponse(AllocateResponse response) { this.response = response; } + + public synchronized long getTimestamp() { + return timestamp; + } + + public synchronized void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public synchronized long getNextHeartbeatInterval() { + return nextHeartbeatInterval; + } + + public synchronized void setNextHeartbeatInterval(long nextHeartbeatInterval) { + this.nextHeartbeatInterval = nextHeartbeatInterval; + } } @VisibleForTesting public Server getServer() { return this.server; } + + private class AdaptiveHeartbeatPolicy { + public long getNextHeartbeatInterval(ApplicationAttemptId appAttemptId) { + if (!adaptiveHeartbeat) { + return defaultHeartbeatInterval; + } else { + float standardDelay = + SchedulerMetrics.getMetrics().getNumWaitingEvents() * delayRatio; + long interval = defaultHeartbeatInterval + (long) (standardDelay + * rScheduler.getHeartbeatDelayRescale(appAttemptId)); + return interval < heartbeatIntervalUpperLimit + ? interval : heartbeatIntervalUpperLimit; + } + } + } + + @VisibleForTesting + public void initAdaptiveHeartbeat() { + Configuration conf = getConfig(); + + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_MS); + heartbeatIntervalUpperLimit = + conf.getLong(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS); + delayRatio = + conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_RATIO, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_RATIO); + this.heartbeatPolicy = new AdaptiveHeartbeatPolicy(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c209873..b719333 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -675,6 +675,10 @@ public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); + SchedulerMetrics.getMetrics().decrNumWaitingEvents(); + if (event.getType().equals(SchedulerEventType.NODE_UPDATE)) { + SchedulerMetrics.getMetrics().decrNumWaitingNodeUpdateEvents(); + } } catch (InterruptedException e) { LOG.error("Returning, interrupted : " + e); return; // TODO: Kill RM. @@ -727,6 +731,10 @@ public void handle(SchedulerEvent event) { + remCapacity); } this.eventQueue.put(event); + SchedulerMetrics.getMetrics().incrNumWaitingEvents(); + if (event.getType().equals(SchedulerEventType.NODE_UPDATE)) { + SchedulerMetrics.getMetrics().incrNumWaitingNodeUpdateEvents(); + } } catch (InterruptedException e) { LOG.info("Interrupted. Trying to exit gracefully."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 16b6a89..fac001c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -91,7 +92,11 @@ private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; - private long nextHeartBeatInterval; + private boolean adaptiveHeartbeat; + private long defaultHeartbeatInterval; + private long heartbeatIntervalUpperLimit; + private float delayRatio; + private AdaptiveHeartbeatPolicy heartbeatPolicy; private Server server; private InetSocketAddress resourceTrackerAddress; private String minimumNodeManagerVersion; @@ -135,14 +140,39 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); RackResolver.init(conf); - nextHeartBeatInterval = + adaptiveHeartbeat = + conf.getBoolean(YarnConfiguration.ADAPTIVE_HEARTBEAT, + YarnConfiguration.DEFAULT_ADAPTIVE_HEARTBEAT); + defaultHeartbeatInterval = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); - if (nextHeartBeatInterval <= 0) { - throw new YarnRuntimeException("Invalid Configuration. " + if (defaultHeartbeatInterval <= 0) { + defaultHeartbeatInterval = + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS; + LOG.warn("Invalid Configuration. " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS - + " should be larger than 0."); + + " should be larger than 0. Now use the default value."); } + heartbeatIntervalUpperLimit = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS); + if (heartbeatIntervalUpperLimit <= 0) { + heartbeatIntervalUpperLimit = + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS; + LOG.warn("Invalid Configuration. " + + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_UPPER_LIMIT_MS + + " should be larger than 0. Now use the default value."); + } + delayRatio = + conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_RATIO, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO); + if (delayRatio < 0) { + delayRatio = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_RATIO; + LOG.warn("Invalid Configuration. " + + YarnConfiguration.RM_NM_HEARTBEAT_DELAY_RATIO + + " should be 0 or positive. Now use the default value."); + } + heartbeatPolicy = new AdaptiveHeartbeatPolicy(); minAllocMb = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -171,7 +201,7 @@ protected void serviceStart() throws Exception { this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, conf, null, - conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, + conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); // Enable service authorization? @@ -190,9 +220,9 @@ protected void serviceStart() throws Exception { this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - server.getListenerAddress()); + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + server.getListenerAddress()); } @Override @@ -438,10 +468,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response + long nextHeartbeatInterval = heartbeatPolicy.getNextHeartbeatInterval(nodeId); NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); + nextHeartbeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -547,4 +578,19 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + private class AdaptiveHeartbeatPolicy { + public long getNextHeartbeatInterval(NodeId nodeId) { + if (!adaptiveHeartbeat) { + return defaultHeartbeatInterval; + } else { + float standardDelay = + SchedulerMetrics.getMetrics().getNumWaitingNodeUpdateEvents() * delayRatio; + long interval = defaultHeartbeatInterval + (long) (standardDelay * + rmContext.getScheduler().getHeartbeatDelayRescale(nodeId)); + return interval < heartbeatIntervalUpperLimit + ? interval : heartbeatIntervalUpperLimit; + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java new file mode 100644 index 0000000..b275387 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import static org.apache.hadoop.metrics2.lib.Interns.info; + +import java.util.concurrent.atomic.AtomicBoolean; + + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class SchedulerMetrics { + + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + @Metric("# of waiting scheduler events") + MutableGaugeInt numWaitingEvents; + @Metric("# of waiting node update events") + MutableGaugeInt numWaitingNodeUpdateEvents; + + private static final MetricsInfo RECORD_INFO = + info("SchedulerMetrics", "Metrics for the Yarn Scheduler"); + private static volatile SchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + // SchedulerMetrics should be a singleton. + private SchedulerMetrics() {}; + + public static SchedulerMetrics getMetrics() { + if(!isInitialized.get()){ + synchronized (SchedulerMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new SchedulerMetrics(); + registerMetrics(); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("SchedulerMetrics", "Metrics for the Yarn Scheduler", + INSTANCE); + } + } + + // Waiting scheduler events + public int getNumWaitingEvents() { + return numWaitingEvents.value(); + } + + public void incrNumWaitingEvents() { + numWaitingEvents.incr(); + } + + public void decrNumWaitingEvents() { + numWaitingEvents.decr(); + } + + public void setNumWaitingEvents(int num) { + numWaitingEvents.set(num); + } + + public int getNumWaitingNodeUpdateEvents() { + return numWaitingNodeUpdateEvents.value(); + } + + public void decrNumWaitingNodeUpdateEvents() { + numWaitingNodeUpdateEvents.decr(); + } + + public void incrNumWaitingNodeUpdateEvents() { + numWaitingNodeUpdateEvents.incr(); + } + + public void setNumWaitingNodeUpdateEvents(int num) { + numWaitingNodeUpdateEvents.set(num); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b99b217..5b3512c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -286,4 +286,14 @@ void setEntitlement(String queue, QueueEntitlement entitlement) * @return an EnumSet containing the resource types */ public EnumSet getSchedulingResourceTypes(); + + /** + * Return a suggested heartbeat delay scale, 1.0 if respect standard delay. + * @param obj to which attempt the interval is set, + * support {@link ApplicationAttemptId} and {@link NodeId}. + * @return the value of the heartbeat interval + */ + @LimitedPrivate("yarn") + @Unstable + float getHeartbeatDelayRescale(Object obj); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 48c7f2f..0f568bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -43,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.util.SampleStat; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; @@ -136,6 +137,12 @@ // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + private SampleStat[] amHeartbeatRescaleFactors = new SampleStat[1]; + private SampleStat amHeartbeatRescale = new SampleStat(); + private SampleStat nodeHeartbeatRescale = new SampleStat(); + private float amMaxScale; + private float nodeMaxScale; + static final Comparator nonPartitionedQueueComparator = new Comparator() { @Override @@ -325,6 +332,21 @@ private synchronized void initScheduler(Configuration configuration) throws asyncSchedulerThread = new AsyncScheduleThread(this); } + amMaxScale = conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI); + if (amMaxScale < 1) { + amMaxScale = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI; + LOG.warn("Deployed YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI should" + + " larger than 1, now use the default value."); + } + nodeMaxScale = conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_GINI, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI); + if (nodeMaxScale < 1) { + nodeMaxScale = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI; + LOG.warn("Deployed YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI" + + " should larger than 1, now use the default value."); + } + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + @@ -1781,4 +1803,45 @@ public SchedulerHealth getSchedulerHealth() { private synchronized void setLastNodeUpdateTime(long time) { this.lastNodeUpdateTime = time; } + + @Override + public float getHeartbeatDelayRescale(Object obj) { + if (obj instanceof ApplicationAttemptId) { + FiCaSchedulerApp appAttempt = getApplicationAttempt((ApplicationAttemptId) obj); + if (appAttempt != null) { + float capacity = + appAttempt.getQueue().getQueueInfo(false, false).getCapacity(); + amHeartbeatRescaleFactors[0].add(capacity); + float reScale = + normalizeScale(amHeartbeatRescaleFactors[0], capacity, amMaxScale); + amHeartbeatRescale.add(reScale); + return normalizeScale(amHeartbeatRescale, reScale, amMaxScale); + } + } else if (obj instanceof NodeId) { + // [need more thinking] We may not discriminate nodes as what we do for apps, + // because nodes are not as variant as apps, and it's hard to determine the next delay + // of a node heartbeat by any expectation or criterion. However, we random the scale + // in a range to avoid clustered heartbeat around particular time intervals. + double rand = Math.random(); + return (float) ((rand < 0.5) ? (1 / nodeMaxScale + (1 - 1 / nodeMaxScale) * rand / 0.5) + : (1 + (nodeMaxScale - 1) * (rand - 0.5) / 0.5)); + } else { + // it should not arrive here. + } + return 1.0f; + } + + private float normalizeScale(SampleStat stat, float value, float maxScale) { + double min = stat.min(); + double max = stat.max(); + double mean = stat.mean(); + // If value is between min and mean, we map it to interval [1/amMaxScale, 1], + // and if value is between mean and max, we map it to interval [1, amMaxScale]. + if (value < mean) { + return (float) (1 / maxScale + (1 - 1 / maxScale) + * (value - min) / (mean - min)); + } else { + return (float) (1 + (maxScale - 1) * (value - mean) / (max - mean)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..7417039 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.util.SampleStat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -185,6 +187,12 @@ // heartbeat protected int maxAssign; // Max containers to assign per heartbeat + private SampleStat[] amHeartbeatRescaleFactors = new SampleStat[3]; + private SampleStat amHeartbeatRescale = new SampleStat(); + private SampleStat nodeHeartbeatRescale = new SampleStat(); + private float amMaxScale; + private float nodeMaxScale; + @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; @@ -1325,6 +1333,21 @@ private void initScheduler(Configuration conf) throws IOException { waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); + amMaxScale = conf.getFloat(YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI, + YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI); + if (amMaxScale < 1) { + amMaxScale = YarnConfiguration.DEFAULT_RM_AM_HEARTBEAT_DELAY_GINI; + LOG.warn("Deployed YarnConfiguration.RM_AM_HEARTBEAT_DELAY_GINI should" + + " larger than 1, now use the default value."); + } + nodeMaxScale = conf.getFloat(YarnConfiguration.RM_NM_HEARTBEAT_DELAY_GINI, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI); + if (nodeMaxScale < 1) { + nodeMaxScale = YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI; + LOG.warn("Deployed YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_DELAY_GINI" + + " should larger than 1, now use the default value."); + } + updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; @@ -1689,4 +1712,57 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return targetQueueName; } + + @Override + public float getHeartbeatDelayRescale(Object obj) { + if (obj instanceof ApplicationAttemptId) { + FSAppAttempt appAttempt = getSchedulerApp((ApplicationAttemptId) obj); + if (appAttempt != null) { + int priority = appAttempt.getPriority().getPriority() + * appAttempt.getQueue().getPriority().getPriority(); + amHeartbeatRescaleFactors[0].add(priority); + float weight = appAttempt.getWeights().getWeight(ResourceType.MEMORY) + * appAttempt.getQueue().getWeights().getWeight(ResourceType.MEMORY); + amHeartbeatRescaleFactors[1].add(weight); + Resource starvation = + Resources.min(RESOURCE_CALCULATOR, clusterResource, + Resources.subtract(appAttempt.getFairShare(), + appAttempt.getCurrentConsumption()), + appAttempt.getAppAttemptResourceUsage().getPending()); + amHeartbeatRescaleFactors[2].add(starvation.getMemory()); + + float reScale = + normalizeScale(amHeartbeatRescaleFactors[0], priority, amMaxScale) + / normalizeScale(amHeartbeatRescaleFactors[1], weight, amMaxScale) + / normalizeScale(amHeartbeatRescaleFactors[2], starvation.getMemory(), amMaxScale); + amHeartbeatRescale.add(reScale); + return normalizeScale(amHeartbeatRescale, reScale, amMaxScale); + } + } else if (obj instanceof NodeId) { + // [need more thinking] We may not discriminate nodes as what we do for apps, + // because nodes are not as variant as apps, and it's hard to determine the next delay + // of a node heartbeat by any expectation or criterion. However, we random the scale + // in a range to avoid clustered heartbeat around particular time intervals. + double rand = Math.random(); + return (float) ((rand < 0.5) ? (1 / nodeMaxScale + (1 - 1 / nodeMaxScale) * rand / 0.5) + : (1 + (nodeMaxScale - 1) * (rand - 0.5) / 0.5)); + } else { + // it should not arrive here. + } + return 1.0f; + } + + private float normalizeScale(SampleStat stat, float value, float maxScale) { + double min = stat.min(); + double max = stat.max(); + double mean = stat.mean(); + // If value is between min and mean, we map it to interval [1/amMaxScale, 1], + // and if value is between mean and max, we map it to interval [1, amMaxScale]. + if (value < mean) { + return (float) (1 / maxScale + (1 - 1 / maxScale) + * (value - min) / (mean - min)); + } else { + return (float) (1 + (maxScale - 1) * (value - mean) / (max - mean)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b8c419c..e542d89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -983,4 +983,9 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, public Resource getUsedResource() { return usedResource; } + + @Override + public float getHeartbeatDelayRescale(Object obj) { + return 1.0f; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index ca5c7a4..d0856e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -68,6 +68,35 @@ public static void setup() { ResourceScheduler.class); } + @Test(timeout = 50000) + public void testGetNextHeartBeatInterval() throws Exception { + conf.set(YarnConfiguration.RM_AM_HEARTBEAT_INTERVAL_MS, "7500"); + MockRM rm = new MockRM(conf); + rm.start(); + + // Init the conf and the adaptiveHeartbeatPolicy + rm.getApplicationMasterService().initAdaptiveHeartbeat(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[]{"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + Assert.assertEquals(7500L, alloc1Response.getNextHeartbeatInterval()); + + rm.stop(); + } + @Test(timeout = 3000000) public void testRMIdentifierOnContainerAllocation() throws Exception { MockRM rm = new MockRM(conf);