diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java index 7d9c7d3..264d03a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.util; import java.io.IOException; +import java.math.BigInteger; import java.util.HashMap; import java.util.Map; @@ -48,7 +49,12 @@ private long cpuTimeMs = UNAVAILABLE; private Map processTree = new HashMap(); - + + /** Track CPU utilization. */ + private final CpuTimeTracker cpuTimeTracker; + /** Clock to account for CPU utilization. */ + private Clock clock; + public static boolean isAvailable() { if (Shell.WINDOWS) { ShellCommandExecutor shellExecutor = new ShellCommandExecutor( @@ -68,9 +74,25 @@ public static boolean isAvailable() { return false; } - public WindowsBasedProcessTree(String pid) { + /** + * Create a monitor for a Windows process tree. + * @param pid Identifier of the job object. + */ + public WindowsBasedProcessTree(final String pid) { + this(pid, new SystemClock()); + } + + /** + * Create a monitor for a Windows process tree. + * @param pid Identifier of the job object. + * @param pClock Clock to keep track of time for CPU utilization. + */ + public WindowsBasedProcessTree(final String pid, final Clock pClock) { super(pid); - taskProcessId = pid; + this.taskProcessId = pid; + this.clock = pClock; + // Instead of jiffies, Windows uses milliseconds directly; 1ms = 1 jiffy + this.cpuTimeTracker = new CpuTimeTracker(1L); } // helper method to override while testing @@ -209,7 +231,7 @@ public long getRssMemorySize(int olderThanAge) { } return total; } - + @Override @SuppressWarnings("deprecation") public long getCumulativeRssmem(int olderThanAge) { @@ -227,9 +249,27 @@ public long getCumulativeCpuTime() { return cpuTimeMs; } + /** + * Get the number of used ms for all the processes under the monitored job + * object. + * @return Total consumed milliseconds by all processes in the job object. + */ + private BigInteger getTotalProcessMs() { + long totalMs = 0; + for (ProcessInfo p : processTree.values()) { + if (p != null) { + totalMs += p.cpuTimeMs; + } + } + return BigInteger.valueOf(totalMs); + } + @Override - public float getCpuUsagePercent() { - return CpuTimeTracker.UNAVAILABLE; + public final float getCpuUsagePercent() { + BigInteger processTotalMs = getTotalProcessMs(); + cpuTimeTracker.updateElapsedJiffies(processTotalMs, clock.getTime()); + + return cpuTimeTracker.getCpuTrackerUsagePercent(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java index 80c5b02..fa2adc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java @@ -21,8 +21,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.Shell; - +import org.junit.Assert; import org.junit.Test; + import static org.junit.Assert.assertTrue; public class TestWindowsBasedProcessTree { @@ -31,8 +32,9 @@ class WindowsBasedProcessTreeTester extends WindowsBasedProcessTree { String infoStr = null; - public WindowsBasedProcessTreeTester(String pid) { - super(pid); + + public WindowsBasedProcessTreeTester(String pid, Clock clock) { + super(pid, clock); } @Override String getAllProcessInfoFromShell() { @@ -49,8 +51,11 @@ public void tree() { } assertTrue("WindowsBasedProcessTree should be available on Windows", WindowsBasedProcessTree.isAvailable()); + ControlledClock testClock = new ControlledClock(new SystemClock()); + long elapsedTimeBetweenUpdatesMsec = 0; + testClock.setTime(elapsedTimeBetweenUpdatesMsec); - WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1"); + WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1", testClock); pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n"; pTree.updateProcessTree(); assertTrue(pTree.getVirtualMemorySize() == 2048); @@ -63,8 +68,11 @@ public void tree() { assertTrue(pTree.getRssMemorySize(0) == 2048); assertTrue(pTree.getCumulativeRssmem(0) == 2048); assertTrue(pTree.getCumulativeCpuTime() == 1000); + assertTrue(pTree.getCpuUsagePercent() == ResourceCalculatorProcessTree.UNAVAILABLE); pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n"; + elapsedTimeBetweenUpdatesMsec = 1000; + testClock.setTime(elapsedTimeBetweenUpdatesMsec); pTree.updateProcessTree(); assertTrue(pTree.getVirtualMemorySize() == 3072); assertTrue(pTree.getCumulativeVmem() == 3072); @@ -75,8 +83,13 @@ public void tree() { assertTrue(pTree.getRssMemorySize(1) == 2048); assertTrue(pTree.getCumulativeRssmem(1) == 2048); assertTrue(pTree.getCumulativeCpuTime() == 3000); + assertTrue(pTree.getCpuUsagePercent() == 200); + Assert.assertEquals("Percent CPU time is not correct", + pTree.getCpuUsagePercent(), 200, 0.01); pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n"; + elapsedTimeBetweenUpdatesMsec = 2000; + testClock.setTime(elapsedTimeBetweenUpdatesMsec); pTree.updateProcessTree(); assertTrue(pTree.getVirtualMemorySize() == 2048); assertTrue(pTree.getCumulativeVmem() == 2048); @@ -87,5 +100,7 @@ public void tree() { assertTrue(pTree.getRssMemorySize(2) == 2048); assertTrue(pTree.getCumulativeRssmem(2) == 2048); assertTrue(pTree.getCumulativeCpuTime() == 4000); + Assert.assertEquals("Percent CPU time is not correct", + pTree.getCpuUsagePercent(), 0, 0.01); } }