diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java index 3f17c9a..95b030e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java @@ -28,28 +28,10 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class CpuTimeTracker { - public static final int UNAVAILABLE = -1; - private final long minimumTimeInterval; - - // CPU used time since system is on (ms) - private BigInteger cumulativeCpuTime = BigInteger.ZERO; - - // CPU used time read last time (ms) - private BigInteger lastCumulativeCpuTime = BigInteger.ZERO; - - // Unix timestamp while reading the CPU time (ms) - private long sampleTime; - private long lastSampleTime; - private float cpuUsage; - private BigInteger jiffyLengthInMillis; +public class CpuTimeTracker extends ResourceTimeTracker { public CpuTimeTracker(long jiffyLengthInMillis) { - this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); - this.cpuUsage = UNAVAILABLE; - this.sampleTime = UNAVAILABLE; - this.lastSampleTime = UNAVAILABLE; - minimumTimeInterval = 10 * jiffyLengthInMillis; + super(jiffyLengthInMillis); } /** @@ -63,22 +45,9 @@ public CpuTimeTracker(long jiffyLengthInMillis) { * {@link CpuTimeTracker#minimumTimeInterval} apart */ public float getCpuTrackerUsagePercent() { - if (lastSampleTime == UNAVAILABLE || - lastSampleTime > sampleTime) { - // lastSampleTime > sampleTime may happen when the system time is changed - lastSampleTime = sampleTime; - lastCumulativeCpuTime = cumulativeCpuTime; - return cpuUsage; - } - // When lastSampleTime is sufficiently old, update cpuUsage. - // Also take a sample of the current time and cumulative CPU time for the - // use of the next calculation. - if (sampleTime > lastSampleTime + minimumTimeInterval) { - cpuUsage = - ((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue()) - * 100F / ((float) (sampleTime - lastSampleTime)); - lastSampleTime = sampleTime; - lastCumulativeCpuTime = cumulativeCpuTime; + float cpuUsage = super.getResourceTrackerUsage(); + if (cpuUsage != UNAVAILABLE) { + cpuUsage = cpuUsage * 100F; } return cpuUsage; } @@ -88,7 +57,7 @@ public float getCpuTrackerUsagePercent() { * @return cumulative CPU time in milliseconds */ public long getCumulativeCpuTime() { - return cumulativeCpuTime.longValue(); + return super.getCumulativeResource(); } /** @@ -97,19 +66,18 @@ public long getCumulativeCpuTime() { * @param newTime new sample time */ public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) { - cumulativeCpuTime = elapsedJiffies.multiply(jiffyLengthInMillis); - sampleTime = newTime; + super.updateElapsedResource(elapsedJiffies, newTime); } @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("SampleTime " + this.sampleTime); - sb.append(" CummulativeCpuTime " + this.cumulativeCpuTime); - sb.append(" LastSampleTime " + this.lastSampleTime); - sb.append(" LastCummulativeCpuTime " + this.lastCumulativeCpuTime); - sb.append(" CpuUsage " + this.cpuUsage); - sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); + sb.append("SampleTime " + sampleTime); + sb.append(" CummulativeCpuTime " + cumulativeResource); + sb.append(" LastSampleTime " + lastSampleTime); + sb.append(" LastCummulativeCpuTime " + lastCumulativeResource); + sb.append(" CpuUsage " + resourceUsage); + sb.append(" JiffyLengthMillisec " + convertionFactor); return sb.toString(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ResourceTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ResourceTimeTracker.java new file mode 100644 index 0000000..bb24021 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ResourceTimeTracker.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.math.BigInteger; + +/** + * Utility for sampling and computing resource usage. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ResourceTimeTracker { + + /** Resource or value is unavailable. */ + public static final int UNAVAILABLE = -1; + + /** Minimum interval between two samples in (ms). */ + private final long minimumTimeInterval; + + /** Resource used time since system is on (ms). */ + protected BigInteger cumulativeResource = BigInteger.ZERO; + + /** Resource used time read last time (ms). */ + protected BigInteger lastCumulativeResource = BigInteger.ZERO; + + /** Time stamp while reading the resource value (ms). */ + protected long sampleTime; + protected long lastSampleTime; + + /** Calculate resource usage. */ + protected float resourceUsage; + + /** Conversion factor. */ + protected BigInteger convertionFactor; + + public ResourceTimeTracker() { + this(1); + } + + public ResourceTimeTracker(long convertionFactor) { + this.convertionFactor = BigInteger.valueOf(convertionFactor); + this.resourceUsage = UNAVAILABLE; + this.sampleTime = UNAVAILABLE; + this.lastSampleTime = UNAVAILABLE; + this.minimumTimeInterval = 10 * convertionFactor; + } + + /** + * Return resource usage spent over the time since last update. Resource time + * spent is based on consumed resources. + * @return Return resource usage since last update per millisecond, {@link + * ResourceTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than + * {@link ResourceTimeTracker#minimumTimeInterval} apart + */ + public float getResourceTrackerUsage() { + if (lastSampleTime == UNAVAILABLE || lastSampleTime > sampleTime) { + // lastSampleTime > sampleTime may happen when the system time is changed + lastSampleTime = sampleTime; + lastCumulativeResource = cumulativeResource; + return resourceUsage; + } + // When lastSampleTime is sufficiently old, update resourceUsage + if (sampleTime > lastSampleTime + minimumTimeInterval) { + float diffResource = cumulativeResource.subtract( + lastCumulativeResource).floatValue(); + float diffTime = (float) (sampleTime - lastSampleTime); + resourceUsage = diffResource / diffTime; + + // Take a sample of the current time and cumulative resource for the + // use of the next calculation. + lastSampleTime = sampleTime; + lastCumulativeResource = cumulativeResource; + } + return resourceUsage; + } + + /** + * Return resource usage spent over the time since last update per second. + * Resource time spent is based on consumed resources. {@link + * ResourceTimeTracker#getResourceTrackerUsage()} + * @return Resource usage per second. + */ + public float getResourceTrackerUsagePerSec() { + float usagePerMs = getResourceTrackerUsage(); + if (usagePerMs == UNAVAILABLE) { + return UNAVAILABLE; + } + return usagePerMs * 1000F; + } + + /** + * Obtain the cumulative CPU time since the system is on. + * @return cumulative CPU time in milliseconds + */ + public long getCumulativeResource() { + return cumulativeResource.longValue(); + } + + /** + * Update the cumulative amount of resource consumed. + * @param elapsedResource Updated consumed resources. + * @param pSampleTime New sample time. + */ + public void updateElapsedResource(long elapsedResource, long pSampleTime) { + updateElapsedResource(BigInteger.valueOf(elapsedResource), pSampleTime); + } + /** + * Update the cumulative amount of resource consumed. + * @param elapsedResource Updated consumed resources. + * @param sampleTime New sample time. + */ + public void updateElapsedResource( + BigInteger elapsedResource, long pSampleTime) { + this.cumulativeResource = elapsedResource.multiply(convertionFactor); + this.sampleTime = pSampleTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SampleTime " + this.sampleTime); + sb.append(" CummulativeResource " + this.cumulativeResource); + sb.append(" LastSampleTime " + this.lastSampleTime); + sb.append(" LastCummulativeResource " + this.lastCumulativeResource); + sb.append(" ResourceUsage " + this.resourceUsage); + sb.append(" JiffyLengthMillisec " + this.convertionFactor); + return sb.toString(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index e8a5714..09790de1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -128,6 +128,18 @@ public static SysInfo newInstance() { public abstract long getNetworkBytesWritten(); /** + * Obtain the aggregated number of bytes per second read over the network. + * @return total number of bytes per second read. + */ + public abstract float getNetworkBytesPerSecRead(); + + /** + * Obtain the aggregated number of bytes per second written to the network. + * @return total number of bytes per second written. + */ + public abstract float getNetworkBytesPerSecWritten(); + + /** * Obtain the aggregated number of bytes read from disks. * * @return total number of bytes read. @@ -141,4 +153,18 @@ public static SysInfo newInstance() { */ public abstract long getStorageBytesWritten(); + /** + * Obtain the aggregated number of bytes per second read from disks. + * + * @return total number of bytes per second read. + */ + public abstract float getStorageBytesPerSecRead(); + + /** + * Obtain the aggregated number of bytes per second written to disks. + * + * @return total number of bytes per second written. + */ + public abstract float getStorageBytesPerSecWritten(); + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index bba1631..3a786c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -100,6 +100,8 @@ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*"); + private ResourceTimeTracker netReadTimeTracker; + private ResourceTimeTracker netWriteTimeTracker; /** * Pattern for parsing /proc/diskstats. @@ -111,6 +113,9 @@ "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)"); + private ResourceTimeTracker diskReadTimeTracker; + private ResourceTimeTracker diskWriteTimeTracker; + /** * Pattern for parsing /sys/block/partition_name/queue/hw_sector_size. */ @@ -207,6 +212,10 @@ public SysInfoLinux(String procfsMemFile, this.jiffyLengthInMillis = jiffyLengthInMillis; this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); this.perDiskSectorSize = new HashMap(); + this.netReadTimeTracker = new ResourceTimeTracker(); + this.netWriteTimeTracker = new ResourceTimeTracker(); + this.diskReadTimeTracker = new ResourceTimeTracker(); + this.diskWriteTimeTracker = new ResourceTimeTracker(); } /** @@ -437,6 +446,12 @@ private void readProcNetInfoFile() { } numNetBytesRead += Long.parseLong(mat.group(2)); numNetBytesWritten += Long.parseLong(mat.group(10)); + + // Update values to calculate utilization + netReadTimeTracker.updateElapsedResource( + numNetBytesRead, getCurrentTime()); + netWriteTimeTracker.updateElapsedResource( + numNetBytesWritten, getCurrentTime()); } str = in.readLine(); } @@ -507,6 +522,12 @@ private void readProcDisksInfoFile() { } numDisksBytesRead += Long.parseLong(sectorsRead) * sectorSize; numDisksBytesWritten += Long.parseLong(sectorsWritten) * sectorSize; + + // Update values to calculate utilization + diskReadTimeTracker.updateElapsedResource( + numDisksBytesRead, getCurrentTime()); + diskWriteTimeTracker.updateElapsedResource( + numDisksBytesWritten, getCurrentTime()); } str = in.readLine(); } @@ -626,7 +647,7 @@ public long getCpuFrequency() { @Override public long getCumulativeCpuTime() { readProcStatFile(); - return cpuTimeTracker.getCumulativeCpuTime(); + return cpuTimeTracker.getCumulativeResource(); } /** {@inheritDoc} */ @@ -665,18 +686,44 @@ public long getNetworkBytesWritten() { return numNetBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecRead() { + return netReadTimeTracker.getResourceTrackerUsagePerSec(); + } + + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecWritten() { + return netWriteTimeTracker.getResourceTrackerUsagePerSec(); + } + + /** {@inheritDoc} */ @Override public long getStorageBytesRead() { readProcDisksInfoFile(); return numDisksBytesRead; } + /** {@inheritDoc} */ @Override public long getStorageBytesWritten() { readProcDisksInfoFile(); return numDisksBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecRead() { + return diskReadTimeTracker.getResourceTrackerUsagePerSec(); + } + + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecWritten() { + return diskWriteTimeTracker.getResourceTrackerUsagePerSec(); + } + /** * Test the {@link SysInfoLinux}. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index e21adac..02ca2bd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -45,9 +45,13 @@ private long cumulativeCpuTimeMs; private float cpuUsage; private long storageBytesRead; + private float storageBytesPerSecRead; private long storageBytesWritten; + private float storageBytesPerSecWritten; private long netBytesRead; + private float netBytesPerSecRead; private long netBytesWritten; + private float netBytesPerSecWritten; private long lastRefreshTime; static final int REFRESH_INTERVAL_MS = 1000; @@ -72,9 +76,13 @@ void reset() { cumulativeCpuTimeMs = -1; cpuUsage = -1; storageBytesRead = -1; + storageBytesPerSecRead = -1; storageBytesWritten = -1; + storageBytesPerSecWritten = -1; netBytesRead = -1; + netBytesPerSecRead = -1; netBytesWritten = -1; + netBytesPerSecWritten = -1; } String getSystemInfoInfoFromShell() { @@ -96,6 +104,10 @@ void refreshIfNeeded() { long refreshInterval = now - lastRefreshTime; lastRefreshTime = now; long lastCumCpuTimeMs = cumulativeCpuTimeMs; + long lastStorageBytesRead = storageBytesRead; + long lastStorageBytesWritten = storageBytesWritten; + long lastNetBytesRead = netBytesRead; + long lastNetBytesWritten = netBytesWritten; reset(); String sysInfoStr = getSystemInfoInfoFromShell(); if (sysInfoStr != null) { @@ -125,6 +137,23 @@ void refreshIfNeeded() { cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) * 100F / refreshInterval; } + if (lastStorageBytesRead != -1) { + storageBytesPerSecRead = (float)((storageBytesRead - + lastStorageBytesRead) * 1000F / refreshInterval); + } + if (lastStorageBytesWritten != -1) { + storageBytesPerSecWritten = (float)((storageBytesWritten - + lastStorageBytesWritten) * 1000F / refreshInterval); + } + if (lastNetBytesRead != -1) { + netBytesPerSecRead = (float)((netBytesRead - lastNetBytesRead) + * 1000F / refreshInterval); + } + if (lastNetBytesWritten != -1) { + netBytesPerSecWritten = + (float)((netBytesWritten - lastNetBytesWritten) + * 1000F / refreshInterval); + } } catch (NumberFormatException nfe) { LOG.warn("Error parsing sysInfo", nfe); } @@ -230,16 +259,46 @@ public long getNetworkBytesWritten() { return netBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecRead() { + refreshIfNeeded(); + return netBytesPerSecRead; + } + + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecWritten() { + refreshIfNeeded(); + return netBytesPerSecWritten; + } + + /** {@inheritDoc} */ @Override public long getStorageBytesRead() { refreshIfNeeded(); return storageBytesRead; } + /** {@inheritDoc} */ @Override public long getStorageBytesWritten() { refreshIfNeeded(); return storageBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecRead() { + refreshIfNeeded(); + return storageBytesPerSecRead; + } + + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecWritten() { + refreshIfNeeded(); + return storageBytesPerSecWritten; + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestResourceTimeTracker.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestResourceTimeTracker.java new file mode 100644 index 0000000..5b21558 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestResourceTimeTracker.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Test the tracker to calculate the resource utilization from cumulative + * values. + */ +public class TestResourceTimeTracker { + + /** + * Test a fake disk that provides number of bytes read. + */ + @Test + public void testResourceTimeTracker() { + ResourceTimeTracker resourceTimeTracker = new ResourceTimeTracker(); + + // Setting the starting value at 1s to 1000 bytes read + resourceTimeTracker.updateElapsedResource(1000, 1 * 1000); + // As it's the first value, it should be unavailable + assertEquals(ResourceTimeTracker.UNAVAILABLE, + resourceTimeTracker.getResourceTrackerUsagePerSec(), 0); + + // At 2s, if the value didn't increase, we should have 0 bytes/sec + resourceTimeTracker.updateElapsedResource(1000, 2 * 1000); + assertEquals(0, resourceTimeTracker.getResourceTrackerUsagePerSec(), 0); + + // At 3s, if we read 1000 bytes, we should have 1000 bytes/second + resourceTimeTracker.updateElapsedResource(2000, 3 * 1000); + assertEquals(1000, resourceTimeTracker.getResourceTrackerUsagePerSec(), 0); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 2ae4872..41d90c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -46,10 +46,25 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { @Public @Unstable + public static ResourceUtilization newInstance(int pmem, int vmem, float cpu, + float disk, float net) { + ResourceUtilization utilization = + Records.newRecord(ResourceUtilization.class); + utilization.setPhysicalMemory(pmem); + utilization.setVirtualMemory(vmem); + utilization.setCPU(cpu); + utilization.setDisk(disk); + utilization.setNetwork(net); + return utilization; + } + + @Public + @Unstable public static ResourceUtilization newInstance( ResourceUtilization resourceUtil) { return newInstance(resourceUtil.getPhysicalMemory(), - resourceUtil.getVirtualMemory(), resourceUtil.getCPU()); + resourceUtil.getVirtualMemory(), resourceUtil.getCPU(), + resourceUtil.getDisk(), resourceUtil.getNetwork()); } /** @@ -106,6 +121,43 @@ public static ResourceUtilization newInstance( @Unstable public abstract void setCPU(float cpu); + /** + * Get disk utilization. + * + * @return disk utilization in MB/s + */ + @Public + @Unstable + public abstract float getDisk(); + + /** + * Set disk utilization. + * + * @param disk disk utilization in MB/s + */ + @Public + @Unstable + public abstract void setDisk(float disk); + + /** + * Get network utilization. + * + * @return network utilization in MB/s + */ + @Public + @Unstable + public abstract float getNetwork(); + + /** + * Set network utilization. + * + * @param net network utilization in MB/s + */ + @Public + @Unstable + public abstract void setNetwork(float net); + + @Override public int hashCode() { final int prime = 263167; @@ -113,6 +165,8 @@ public int hashCode() { result = prime * result + getVirtualMemory(); result = prime * result + getPhysicalMemory(); result = 31 * result + Float.valueOf(getCPU()).hashCode(); + result = 31 * result + Float.valueOf(getDisk()).hashCode(); + result = 31 * result + Float.valueOf(getNetwork()).hashCode(); return result; } @@ -130,7 +184,9 @@ public boolean equals(Object obj) { ResourceUtilization other = (ResourceUtilization) obj; if (getVirtualMemory() != other.getVirtualMemory() || getPhysicalMemory() != other.getPhysicalMemory() - || getCPU() != other.getCPU()) { + || getCPU() != other.getCPU() + || getDisk() != other.getDisk() + || getNetwork() != other.getNetwork()) { return false; } return true; @@ -139,7 +195,8 @@ public boolean equals(Object obj) { @Override public String toString() { return ""; + + ", vCores:" + getCPU() + ", disk:" + getDisk() + ", net:" + + getNetwork() + ">"; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fd..c7b5508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -62,6 +62,8 @@ message ResourceUtilizationProto { optional int32 pmem = 1; optional int32 vmem = 2; optional float cpu = 3; + optional float disk = 4; + optional float network = 5; } message ResourceOptionProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java index e37adbe..cd3b656 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -91,6 +91,30 @@ public void setCPU(float cpu) { } @Override + public float getDisk() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return p.getDisk(); + } + + @Override + public void setDisk(float disk) { + maybeInitBuilder(); + builder.setDisk(disk); + } + + @Override + public float getNetwork() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return p.getNetwork(); + } + + @Override + public void setNetwork(float net) { + maybeInitBuilder(); + builder.setNetwork(net); + } + + @Override public int compareTo(ResourceUtilization other) { int diff = this.getPhysicalMemory() - other.getPhysicalMemory(); if (diff == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 7b2ea56..3905100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -154,6 +154,22 @@ public long getNetworkBytesWritten() { } /** + * Obtain the aggregated number of bytes read over the network. + * @return total number of bytes read. + */ + public float getNetworkBytesPerSecRead() { + return sys.getNetworkBytesPerSecRead(); + } + + /** + * Obtain the aggregated number of bytes written to the network. + * @return total number of bytes written. + */ + public float getNetworkBytesPerSecWritten() { + return sys.getNetworkBytesPerSecWritten(); + } + + /** * Obtain the aggregated number of bytes read from disks. * * @return total number of bytes read. @@ -172,6 +188,24 @@ public long getStorageBytesWritten() { } /** + * Obtain the aggregated number of bytes read from disks. + * + * @return total number of bytes read. + */ + public float getStorageBytesPerSecRead() { + return sys.getStorageBytesPerSecRead(); + } + + /** + * Obtain the aggregated number of bytes written to disks. + * + * @return total number of bytes written. + */ + public float getStorageBytesPerSecWritten() { + return sys.getStorageBytesPerSecWritten(); + } + + /** * Create the ResourceCalculatorPlugin from the class name and configure it. If * class name is null, this method will try and return a memory calculator * plugin available for this system. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java index 5934846..19db127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java @@ -30,6 +30,8 @@ public void testResourceUtilization() { ResourceUtilization u3 = ResourceUtilization.newInstance(10, 20, 0.5f); ResourceUtilization u4 = ResourceUtilization.newInstance(20, 20, 0.5f); ResourceUtilization u5 = ResourceUtilization.newInstance(30, 40, 0.8f); + ResourceUtilization u6 = ResourceUtilization.newInstance( + 10, 20, 0.5f, 1f, 2f); Assert.assertEquals(u1, u2); Assert.assertEquals(u1, u3); @@ -48,7 +50,7 @@ public void testResourceUtilization() { Assert.assertTrue(u1.getCPU() == 0.5f); Assert.assertEquals("", u1.toString()); + + ", vCores:0.5, disk:0.0, net:0.0>", u1.toString()); u1.addTo(10, 0, 0.0f); Assert.assertNotEquals(u1, u2); @@ -59,5 +61,9 @@ public void testResourceUtilization() { Assert.assertEquals(u1, u4); u1.subtractFrom(10, 0, 0.0f); Assert.assertEquals(u1, u3); + + u6.subtractFrom(5, 10, 0.1f); + Assert.assertEquals("", + u6.toString()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index 3a78d87..e462b0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -142,11 +142,17 @@ public void run() { resourceCalculatorPlugin.getVirtualMemorySize() - resourceCalculatorPlugin.getAvailableVirtualMemorySize(); float vcores = resourceCalculatorPlugin.getNumVCoresUsed(); + float disk = resourceCalculatorPlugin.getStorageBytesRead() + + resourceCalculatorPlugin.getStorageBytesWritten(); + float net = resourceCalculatorPlugin.getNetworkBytesRead() + + resourceCalculatorPlugin.getNetworkBytesWritten(); nodeUtilization = ResourceUtilization.newInstance( (int) (pmem >> 20), // B -> MB (int) (vmem >> 20), // B -> MB - vcores); // Used Virtual Cores + vcores, // Used Virtual Cores + disk / (1024 * 1024), // B/s -> MB/s + net / (1024 * 1024)); // B/s -> MB/s try { Thread.sleep(monitoringInterval);