diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index f0786da..1ca78d5 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -71,7 +71,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; /** * This class provides a way to interact with history files in a thread safe @@ -505,7 +505,7 @@ protected void serviceInit(Configuration conf) throws Exception { long maxFSWaitTime = conf.getLong( JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); - createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); + createHistoryDirs(new MonotonicClock(), 10 * 1000, maxFSWaitTime); this.aclsMgr = new JobACLsManager(conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/MonotonicClock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/MonotonicClock.java new file mode 100644 index 0000000..9a8c186 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/MonotonicClock.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +/** + * Implementation of {@link Clock} that can be used to measure a period of time. + */ +public class MonotonicClock implements Clock{ + + private static final long NANOSECONDS_PER_MILLISECOND = 1000000; + private final long gap; + + private boolean timing = false; + private long start = -1; + + public MonotonicClock() { + gap = System.nanoTime() / NANOSECONDS_PER_MILLISECOND - System.currentTimeMillis(); + } + + /** + * Get time from the clock. The time is ensured to be monotonic, which means + * it is not disturbed in the lifetime of the clock. The clock is calibrated + * with the System clock when the clock is initialized, but note that the + * returned value may be different with that get from System.currentTimeMills, + * which is because a) System clock may be updated by NTP, and b) there's + * accumulated error of monotonic clock. It is not good practice to use it + * get the wall-clock time. + * @return time in millisecond. + */ + @Override + public long getTime() { + return System.nanoTime() / NANOSECONDS_PER_MILLISECOND - gap; + } + + /** + * Start inner timer. Note: the inner timer is not thread-safe. + */ + public void startTimer() { + timing = true; + this.start = getTime(); + } + + /** + * Stop timer and get the result. + * @return the interval in millisecond. + */ + public long stopTimerAndGetResult() { + if (timing == false) + throw new IllegalStateException("Method 'start()' should be called" + + " ahead of stopAndGetInterval"); + timing = false; + return getTime() - start; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 3c76596..9e24e48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -78,7 +78,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; public class ContainerImpl implements Container { @@ -98,7 +98,8 @@ private boolean wasLaunched; private long containerLocalizationStartTime; private long containerLaunchStartTime; - private static Clock clock = new SystemClock(); + // Monotonic clock used to measure time elapse. + private static Clock clock = new MonotonicClock(); /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java index 0d71a9d..aeb4bbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java @@ -33,13 +33,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,6 +67,7 @@ private Map controllerPaths; private final ReadWriteLock rwLock; private final PrivilegedOperationExecutor privilegedOperationExecutor; + // Monotonic clock used to measure elapsed time. private final Clock clock; public CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor @@ -88,7 +88,7 @@ public CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor this.controllerPaths = new HashMap<>(); this.rwLock = new ReentrantReadWriteLock(); this.privilegedOperationExecutor = privilegedOperationExecutor; - this.clock = new SystemClock(); + this.clock = new MonotonicClock(); init(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 6994fc3..5a4a1d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; -import org.apache.hadoop.yarn.util.SystemClock; public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { @@ -79,7 +79,7 @@ private long deleteCgroupTimeout; private long deleteCgroupDelay; - // package private for testing purposes + // Monotonic clock used to measure time elapse, package private for testing purposes Clock clock; private float yarnProcessors; @@ -87,7 +87,7 @@ public CgroupsLCEResourcesHandler() { this.controllerPaths = new HashMap(); - clock = new SystemClock(); + clock = new MonotonicClock(); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 77df059..a062abc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -120,6 +120,7 @@ private RMContext rmContext; + // Monotonic clock used to measure time elapse. private final Clock clock; private double maxIgnoredOverCapacity; private long maxWaitTime; @@ -136,12 +137,12 @@ private RMNodeLabelsManager nlm; public ProportionalCapacityPreemptionPolicy() { - clock = new SystemClock(); + clock = new MonotonicClock(); } public ProportionalCapacityPreemptionPolicy(Configuration config, RMContext context, CapacityScheduler scheduler) { - this(config, context, scheduler, new SystemClock()); + this(config, context, scheduler, new MonotonicClock()); } public ProportionalCapacityPreemptionPolicy(Configuration config, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java index 76331bf..b03fc69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java @@ -25,14 +25,14 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; public class AMLivelinessMonitor extends AbstractLivelinessMonitor { private EventHandler dispatcher; public AMLivelinessMonitor(Dispatcher d) { - super("AMLivelinessMonitor", new SystemClock()); + super("AMLivelinessMonitor", new MonotonicClock()); this.dispatcher = d.getEventHandler(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index dab6d9f..9ad4825 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -71,6 +71,7 @@ public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + // Monotonic clock used to measure time elapse. private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues @@ -88,7 +89,7 @@ private volatile boolean running = true; public AllocationFileLoaderService() { - this(new SystemClock()); + this(new MonotonicClock()); } public AllocationFileLoaderService(Clock clock) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..22aa830 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -122,6 +123,7 @@ private Resource incrAllocation; private QueueManager queueMgr; private volatile Clock clock; + private volatile Clock monotonicClock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -198,6 +200,7 @@ public FairScheduler() { super(FairScheduler.class.getName()); clock = new SystemClock(); + monotonicClock = new MonotonicClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -269,10 +272,10 @@ public void run() { synchronized (updateThreadMonitor) { updateThreadMonitor.wait(updateInterval); } - long start = getClock().getTime(); + long start = getMonotonicClock().getTime(); update(); preemptTasksIfNecessary(); - long duration = getClock().getTime() - start; + long duration = getMonotonicClock().getTime() - start; fsOpDurations.addUpdateThreadRunDuration(duration); } catch (InterruptedException ie) { LOG.warn("Update thread interrupted. Exiting."); @@ -310,7 +313,7 @@ public void run() { * required resources per job. */ protected synchronized void update() { - long start = getClock().getTime(); + long start = getMonotonicClock().getTime(); updateStarvationStats(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -336,7 +339,7 @@ protected synchronized void update() { } } - long duration = getClock().getTime() - start; + long duration = getMonotonicClock().getTime() - start; fsOpDurations.addUpdateCallDuration(duration); } @@ -391,7 +394,7 @@ protected synchronized void preemptTasksIfNecessary() { * We make sure that no queue is placed below its fair share in the process. */ protected void preemptResources(Resource toPreempt) { - long start = getClock().getTime(); + long start = getMonotonicClock().getTime(); if (Resources.equals(toPreempt, Resources.none())) { return; } @@ -437,7 +440,7 @@ protected void preemptResources(Resource toPreempt) { } } - long duration = getClock().getTime() - start; + long duration = getMonotonicClock().getTime() - start; fsOpDurations.addPreemptCallDuration(duration); } @@ -574,6 +577,10 @@ public Clock getClock() { return clock; } + public Clock getMonotonicClock() { + return monotonicClock; + } + @VisibleForTesting void setClock(Clock clock) { this.clock = clock; @@ -975,7 +982,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { - long start = getClock().getTime(); + long start = getMonotonicClock().getTime(); if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } @@ -1010,12 +1017,12 @@ private synchronized void nodeUpdate(RMNode nm) { attemptScheduling(node); } - long duration = getClock().getTime() - start; + long duration = getMonotonicClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); } void continuousSchedulingAttempt() throws InterruptedException { - long start = getClock().getTime(); + long start = getMonotonicClock().getTime(); List nodeIdList = new ArrayList(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -1039,7 +1046,7 @@ void continuousSchedulingAttempt() throws InterruptedException { } } - long duration = getClock().getTime() - start; + long duration = getMonotonicClock().getTime() - start; fsOpDurations.addContinuousSchedulingRunDuration(duration); }