diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index e8a5714..09790de1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -128,6 +128,18 @@ public static SysInfo newInstance() { public abstract long getNetworkBytesWritten(); /** + * Obtain the aggregated number of bytes per second read over the network. + * @return total number of bytes per second read. + */ + public abstract float getNetworkBytesPerSecRead(); + + /** + * Obtain the aggregated number of bytes per second written to the network. + * @return total number of bytes per second written. + */ + public abstract float getNetworkBytesPerSecWritten(); + + /** * Obtain the aggregated number of bytes read from disks. * * @return total number of bytes read. @@ -141,4 +153,18 @@ public static SysInfo newInstance() { */ public abstract long getStorageBytesWritten(); + /** + * Obtain the aggregated number of bytes per second read from disks. + * + * @return total number of bytes per second read. + */ + public abstract float getStorageBytesPerSecRead(); + + /** + * Obtain the aggregated number of bytes per second written to disks. + * + * @return total number of bytes per second written. + */ + public abstract float getStorageBytesPerSecWritten(); + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index bba1631..f08c307 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -665,18 +665,48 @@ public long getNetworkBytesWritten() { return numNetBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecRead() { + // TODO + return -1f; + } + + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecWritten() { + // TODO + return -1f; + } + + /** {@inheritDoc} */ @Override public long getStorageBytesRead() { readProcDisksInfoFile(); return numDisksBytesRead; } + /** {@inheritDoc} */ @Override public long getStorageBytesWritten() { readProcDisksInfoFile(); return numDisksBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecRead() { + // TODO + return -1f; + } + + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecWritten() { + // TODO + return -1f; + } + /** * Test the {@link SysInfoLinux}. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index 490c127..5d19916 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -45,9 +45,13 @@ private long cumulativeCpuTimeMs; private float cpuUsage; private long storageBytesRead; + private float storageBytesPerSecRead; private long storageBytesWritten; + private float storageBytesPerSecWritten; private long netBytesRead; + private float netBytesPerSecRead; private long netBytesWritten; + private float netBytesPerSecWritten; private long lastRefreshTime; static final int REFRESH_INTERVAL_MS = 1000; @@ -72,9 +76,13 @@ void reset() { cumulativeCpuTimeMs = -1; cpuUsage = -1; storageBytesRead = -1; + storageBytesPerSecRead = -1; storageBytesWritten = -1; + storageBytesPerSecWritten = -1; netBytesRead = -1; + netBytesPerSecRead = -1; netBytesWritten = -1; + netBytesPerSecWritten = -1; } String getSystemInfoInfoFromShell() { @@ -96,6 +104,10 @@ void refreshIfNeeded() { long refreshInterval = now - lastRefreshTime; lastRefreshTime = now; long lastCumCpuTimeMs = cumulativeCpuTimeMs; + long lastStorageBytesRead = storageBytesRead; + long lastStorageBytesWritten = storageBytesWritten; + long lastNetBytesRead = netBytesRead; + long lastNetBytesWritten = netBytesWritten; reset(); String sysInfoStr = getSystemInfoInfoFromShell(); if (sysInfoStr != null) { @@ -124,6 +136,22 @@ void refreshIfNeeded() { cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) * 100F / refreshInterval; } + if (lastStorageBytesRead != -1) { + storageBytesPerSecRead = (float)((storageBytesRead - + lastStorageBytesRead) * 1000F / refreshInterval); + } + if (lastStorageBytesWritten != -1) { + storageBytesPerSecWritten = (float)((storageBytesWritten - + lastStorageBytesWritten) * 1000F / refreshInterval); + } + if (lastNetBytesRead != -1) { + netBytesPerSecRead = (float)((netBytesRead - lastNetBytesRead) + * 1000F / refreshInterval); + } + if (lastNetBytesWritten != -1) { + netBytesPerSecWritten = (float)((netBytesWritten - lastNetBytesWritten) + * 1000F / refreshInterval); + } } catch (NumberFormatException nfe) { LOG.warn("Error parsing sysInfo", nfe); } @@ -226,16 +254,46 @@ public long getNetworkBytesWritten() { return netBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecRead() { + refreshIfNeeded(); + return netBytesPerSecRead; + } + + /** {@inheritDoc} */ + @Override + public float getNetworkBytesPerSecWritten() { + refreshIfNeeded(); + return netBytesPerSecWritten; + } + + /** {@inheritDoc} */ @Override public long getStorageBytesRead() { refreshIfNeeded(); return storageBytesRead; } + /** {@inheritDoc} */ @Override public long getStorageBytesWritten() { refreshIfNeeded(); return storageBytesWritten; } + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecRead() { + refreshIfNeeded(); + return storageBytesPerSecRead; + } + + /** {@inheritDoc} */ + @Override + public float getStorageBytesPerSecWritten() { + refreshIfNeeded(); + return storageBytesPerSecWritten; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 2ae4872..dc7e0e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -43,13 +43,27 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { utilization.setCPU(cpu); return utilization; } +@Public + @Unstable + public static ResourceUtilization newInstance(int pmem, int vmem, float cpu, + float disk, float net) { + ResourceUtilization utilization = + Records.newRecord(ResourceUtilization.class); + utilization.setPhysicalMemory(pmem); + utilization.setVirtualMemory(vmem); + utilization.setCPU(cpu); + utilization.setDisk(disk); + utilization.setNetwork(net); + return utilization; + } @Public @Unstable public static ResourceUtilization newInstance( ResourceUtilization resourceUtil) { return newInstance(resourceUtil.getPhysicalMemory(), - resourceUtil.getVirtualMemory(), resourceUtil.getCPU()); + resourceUtil.getVirtualMemory(), resourceUtil.getCPU(), + resourceUtil.getDisk(), resourceUtil.getNetwork()); } /** @@ -106,6 +120,43 @@ public static ResourceUtilization newInstance( @Unstable public abstract void setCPU(float cpu); + /** + * Get disk utilization. + * + * @return disk utilization in MB/s + */ + @Public + @Unstable + public abstract float getDisk(); + + /** + * Set disk utilization. + * + * @param disk disk utilization in MB/s + */ + @Public + @Unstable + public abstract void setDisk(float disk); + + /** + * Get network utilization. + * + * @return network utilization in MB/s + */ + @Public + @Unstable + public abstract float getNetwork(); + + /** + * Set network utilization. + * + * @param disk network utilization in MB/s + */ + @Public + @Unstable + public abstract void setNetwork(float net); + + @Override public int hashCode() { final int prime = 263167; @@ -113,6 +164,8 @@ public int hashCode() { result = prime * result + getVirtualMemory(); result = prime * result + getPhysicalMemory(); result = 31 * result + Float.valueOf(getCPU()).hashCode(); + result = 31 * result + Float.valueOf(getDisk()).hashCode(); + result = 31 * result + Float.valueOf(getNetwork()).hashCode(); return result; } @@ -130,7 +183,9 @@ public boolean equals(Object obj) { ResourceUtilization other = (ResourceUtilization) obj; if (getVirtualMemory() != other.getVirtualMemory() || getPhysicalMemory() != other.getPhysicalMemory() - || getCPU() != other.getCPU()) { + || getCPU() != other.getCPU() + || getDisk() != other.getDisk() + || getNetwork() != other.getNetwork()) { return false; } return true; @@ -139,7 +194,8 @@ public boolean equals(Object obj) { @Override public String toString() { return ""; + + ", vCores:" + getCPU() + ", disk:" + getDisk() + ", net:" + + getNetwork() + ">"; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c84f4e9..2894606 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -62,6 +62,8 @@ message ResourceUtilizationProto { optional int32 pmem = 1; optional int32 vmem = 2; optional float cpu = 3; + optional float disk = 4; + optional float network = 5; } message ResourceOptionProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java index e37adbe..cd3b656 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -91,6 +91,30 @@ public void setCPU(float cpu) { } @Override + public float getDisk() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return p.getDisk(); + } + + @Override + public void setDisk(float disk) { + maybeInitBuilder(); + builder.setDisk(disk); + } + + @Override + public float getNetwork() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return p.getNetwork(); + } + + @Override + public void setNetwork(float net) { + maybeInitBuilder(); + builder.setNetwork(net); + } + + @Override public int compareTo(ResourceUtilization other) { int diff = this.getPhysicalMemory() - other.getPhysicalMemory(); if (diff == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 7b2ea56..3905100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -154,6 +154,22 @@ public long getNetworkBytesWritten() { } /** + * Obtain the aggregated number of bytes read over the network. + * @return total number of bytes read. + */ + public float getNetworkBytesPerSecRead() { + return sys.getNetworkBytesPerSecRead(); + } + + /** + * Obtain the aggregated number of bytes written to the network. + * @return total number of bytes written. + */ + public float getNetworkBytesPerSecWritten() { + return sys.getNetworkBytesPerSecWritten(); + } + + /** * Obtain the aggregated number of bytes read from disks. * * @return total number of bytes read. @@ -172,6 +188,24 @@ public long getStorageBytesWritten() { } /** + * Obtain the aggregated number of bytes read from disks. + * + * @return total number of bytes read. + */ + public float getStorageBytesPerSecRead() { + return sys.getStorageBytesPerSecRead(); + } + + /** + * Obtain the aggregated number of bytes written to disks. + * + * @return total number of bytes written. + */ + public float getStorageBytesPerSecWritten() { + return sys.getStorageBytesPerSecWritten(); + } + + /** * Create the ResourceCalculatorPlugin from the class name and configure it. If * class name is null, this method will try and return a memory calculator * plugin available for this system. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index 3a78d87..e462b0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -142,11 +142,17 @@ public void run() { resourceCalculatorPlugin.getVirtualMemorySize() - resourceCalculatorPlugin.getAvailableVirtualMemorySize(); float vcores = resourceCalculatorPlugin.getNumVCoresUsed(); + float disk = resourceCalculatorPlugin.getStorageBytesRead() + + resourceCalculatorPlugin.getStorageBytesWritten(); + float net = resourceCalculatorPlugin.getNetworkBytesRead() + + resourceCalculatorPlugin.getNetworkBytesWritten(); nodeUtilization = ResourceUtilization.newInstance( (int) (pmem >> 20), // B -> MB (int) (vmem >> 20), // B -> MB - vcores); // Used Virtual Cores + vcores, // Used Virtual Cores + disk / (1024 * 1024), // B/s -> MB/s + net / (1024 * 1024)); // B/s -> MB/s try { Thread.sleep(monitoringInterval);