diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
index e8a5714..09790de1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java
@@ -128,6 +128,18 @@ public static SysInfo newInstance() {
public abstract long getNetworkBytesWritten();
/**
+ * Obtain the aggregated number of bytes per second read over the network.
+ * @return total number of bytes per second read.
+ */
+ public abstract float getNetworkBytesPerSecRead();
+
+ /**
+ * Obtain the aggregated number of bytes per second written to the network.
+ * @return total number of bytes per second written.
+ */
+ public abstract float getNetworkBytesPerSecWritten();
+
+ /**
* Obtain the aggregated number of bytes read from disks.
*
* @return total number of bytes read.
@@ -141,4 +153,18 @@ public static SysInfo newInstance() {
*/
public abstract long getStorageBytesWritten();
+ /**
+ * Obtain the aggregated number of bytes per second read from disks.
+ *
+ * @return total number of bytes per second read.
+ */
+ public abstract float getStorageBytesPerSecRead();
+
+ /**
+ * Obtain the aggregated number of bytes per second written to disks.
+ *
+ * @return total number of bytes per second written.
+ */
+ public abstract float getStorageBytesPerSecWritten();
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
index bba1631..f08c307 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java
@@ -665,18 +665,48 @@ public long getNetworkBytesWritten() {
return numNetBytesWritten;
}
+ /** {@inheritDoc} */
+ @Override
+ public float getNetworkBytesPerSecRead() {
+ // TODO
+ return -1f;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getNetworkBytesPerSecWritten() {
+ // TODO
+ return -1f;
+ }
+
+ /** {@inheritDoc} */
@Override
public long getStorageBytesRead() {
readProcDisksInfoFile();
return numDisksBytesRead;
}
+ /** {@inheritDoc} */
@Override
public long getStorageBytesWritten() {
readProcDisksInfoFile();
return numDisksBytesWritten;
}
+ /** {@inheritDoc} */
+ @Override
+ public float getStorageBytesPerSecRead() {
+ // TODO
+ return -1f;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getStorageBytesPerSecWritten() {
+ // TODO
+ return -1f;
+ }
+
/**
* Test the {@link SysInfoLinux}.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
index 490c127..5d19916 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java
@@ -45,9 +45,13 @@
private long cumulativeCpuTimeMs;
private float cpuUsage;
private long storageBytesRead;
+ private float storageBytesPerSecRead;
private long storageBytesWritten;
+ private float storageBytesPerSecWritten;
private long netBytesRead;
+ private float netBytesPerSecRead;
private long netBytesWritten;
+ private float netBytesPerSecWritten;
private long lastRefreshTime;
static final int REFRESH_INTERVAL_MS = 1000;
@@ -72,9 +76,13 @@ void reset() {
cumulativeCpuTimeMs = -1;
cpuUsage = -1;
storageBytesRead = -1;
+ storageBytesPerSecRead = -1;
storageBytesWritten = -1;
+ storageBytesPerSecWritten = -1;
netBytesRead = -1;
+ netBytesPerSecRead = -1;
netBytesWritten = -1;
+ netBytesPerSecWritten = -1;
}
String getSystemInfoInfoFromShell() {
@@ -96,6 +104,10 @@ void refreshIfNeeded() {
long refreshInterval = now - lastRefreshTime;
lastRefreshTime = now;
long lastCumCpuTimeMs = cumulativeCpuTimeMs;
+ long lastStorageBytesRead = storageBytesRead;
+ long lastStorageBytesWritten = storageBytesWritten;
+ long lastNetBytesRead = netBytesRead;
+ long lastNetBytesWritten = netBytesWritten;
reset();
String sysInfoStr = getSystemInfoInfoFromShell();
if (sysInfoStr != null) {
@@ -124,6 +136,22 @@ void refreshIfNeeded() {
cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs)
* 100F / refreshInterval;
}
+ if (lastStorageBytesRead != -1) {
+ storageBytesPerSecRead = (float)((storageBytesRead -
+ lastStorageBytesRead) * 1000F / refreshInterval);
+ }
+ if (lastStorageBytesWritten != -1) {
+ storageBytesPerSecWritten = (float)((storageBytesWritten -
+ lastStorageBytesWritten) * 1000F / refreshInterval);
+ }
+ if (lastNetBytesRead != -1) {
+ netBytesPerSecRead = (float)((netBytesRead - lastNetBytesRead)
+ * 1000F / refreshInterval);
+ }
+ if (lastNetBytesWritten != -1) {
+ netBytesPerSecWritten = (float)((netBytesWritten - lastNetBytesWritten)
+ * 1000F / refreshInterval);
+ }
} catch (NumberFormatException nfe) {
LOG.warn("Error parsing sysInfo", nfe);
}
@@ -226,16 +254,46 @@ public long getNetworkBytesWritten() {
return netBytesWritten;
}
+ /** {@inheritDoc} */
+ @Override
+ public float getNetworkBytesPerSecRead() {
+ refreshIfNeeded();
+ return netBytesPerSecRead;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getNetworkBytesPerSecWritten() {
+ refreshIfNeeded();
+ return netBytesPerSecWritten;
+ }
+
+ /** {@inheritDoc} */
@Override
public long getStorageBytesRead() {
refreshIfNeeded();
return storageBytesRead;
}
+ /** {@inheritDoc} */
@Override
public long getStorageBytesWritten() {
refreshIfNeeded();
return storageBytesWritten;
}
+ /** {@inheritDoc} */
+ @Override
+ public float getStorageBytesPerSecRead() {
+ refreshIfNeeded();
+ return storageBytesPerSecRead;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getStorageBytesPerSecWritten() {
+ refreshIfNeeded();
+ return storageBytesPerSecWritten;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
index 2ae4872..dc7e0e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
@@ -43,13 +43,27 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) {
utilization.setCPU(cpu);
return utilization;
}
+@Public
+ @Unstable
+ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu,
+ float disk, float net) {
+ ResourceUtilization utilization =
+ Records.newRecord(ResourceUtilization.class);
+ utilization.setPhysicalMemory(pmem);
+ utilization.setVirtualMemory(vmem);
+ utilization.setCPU(cpu);
+ utilization.setDisk(disk);
+ utilization.setNetwork(net);
+ return utilization;
+ }
@Public
@Unstable
public static ResourceUtilization newInstance(
ResourceUtilization resourceUtil) {
return newInstance(resourceUtil.getPhysicalMemory(),
- resourceUtil.getVirtualMemory(), resourceUtil.getCPU());
+ resourceUtil.getVirtualMemory(), resourceUtil.getCPU(),
+ resourceUtil.getDisk(), resourceUtil.getNetwork());
}
/**
@@ -106,6 +120,43 @@ public static ResourceUtilization newInstance(
@Unstable
public abstract void setCPU(float cpu);
+ /**
+ * Get disk utilization.
+ *
+ * @return disk utilization in MB/s
+ */
+ @Public
+ @Unstable
+ public abstract float getDisk();
+
+ /**
+ * Set disk utilization.
+ *
+ * @param disk disk utilization in MB/s
+ */
+ @Public
+ @Unstable
+ public abstract void setDisk(float disk);
+
+ /**
+ * Get network utilization.
+ *
+ * @return network utilization in MB/s
+ */
+ @Public
+ @Unstable
+ public abstract float getNetwork();
+
+ /**
+ * Set network utilization.
+ *
+ * @param disk network utilization in MB/s
+ */
+ @Public
+ @Unstable
+ public abstract void setNetwork(float net);
+
+
@Override
public int hashCode() {
final int prime = 263167;
@@ -113,6 +164,8 @@ public int hashCode() {
result = prime * result + getVirtualMemory();
result = prime * result + getPhysicalMemory();
result = 31 * result + Float.valueOf(getCPU()).hashCode();
+ result = 31 * result + Float.valueOf(getDisk()).hashCode();
+ result = 31 * result + Float.valueOf(getNetwork()).hashCode();
return result;
}
@@ -130,7 +183,9 @@ public boolean equals(Object obj) {
ResourceUtilization other = (ResourceUtilization) obj;
if (getVirtualMemory() != other.getVirtualMemory()
|| getPhysicalMemory() != other.getPhysicalMemory()
- || getCPU() != other.getCPU()) {
+ || getCPU() != other.getCPU()
+ || getDisk() != other.getDisk()
+ || getNetwork() != other.getNetwork()) {
return false;
}
return true;
@@ -139,7 +194,8 @@ public boolean equals(Object obj) {
@Override
public String toString() {
return "";
+ + ", vCores:" + getCPU() + ", disk:" + getDisk() + ", net:"
+ + getNetwork() + ">";
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index c84f4e9..2894606 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -62,6 +62,8 @@ message ResourceUtilizationProto {
optional int32 pmem = 1;
optional int32 vmem = 2;
optional float cpu = 3;
+ optional float disk = 4;
+ optional float network = 5;
}
message ResourceOptionProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
index e37adbe..cd3b656 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
@@ -91,6 +91,30 @@ public void setCPU(float cpu) {
}
@Override
+ public float getDisk() {
+ ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getDisk();
+ }
+
+ @Override
+ public void setDisk(float disk) {
+ maybeInitBuilder();
+ builder.setDisk(disk);
+ }
+
+ @Override
+ public float getNetwork() {
+ ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getNetwork();
+ }
+
+ @Override
+ public void setNetwork(float net) {
+ maybeInitBuilder();
+ builder.setNetwork(net);
+ }
+
+ @Override
public int compareTo(ResourceUtilization other) {
int diff = this.getPhysicalMemory() - other.getPhysicalMemory();
if (diff == 0) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
index 7b2ea56..3905100 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
@@ -154,6 +154,22 @@ public long getNetworkBytesWritten() {
}
/**
+ * Obtain the aggregated number of bytes read over the network.
+ * @return total number of bytes read.
+ */
+ public float getNetworkBytesPerSecRead() {
+ return sys.getNetworkBytesPerSecRead();
+ }
+
+ /**
+ * Obtain the aggregated number of bytes written to the network.
+ * @return total number of bytes written.
+ */
+ public float getNetworkBytesPerSecWritten() {
+ return sys.getNetworkBytesPerSecWritten();
+ }
+
+ /**
* Obtain the aggregated number of bytes read from disks.
*
* @return total number of bytes read.
@@ -172,6 +188,24 @@ public long getStorageBytesWritten() {
}
/**
+ * Obtain the aggregated number of bytes read from disks.
+ *
+ * @return total number of bytes read.
+ */
+ public float getStorageBytesPerSecRead() {
+ return sys.getStorageBytesPerSecRead();
+ }
+
+ /**
+ * Obtain the aggregated number of bytes written to disks.
+ *
+ * @return total number of bytes written.
+ */
+ public float getStorageBytesPerSecWritten() {
+ return sys.getStorageBytesPerSecWritten();
+ }
+
+ /**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator
* plugin available for this system.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
index 3a78d87..e462b0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
@@ -142,11 +142,17 @@ public void run() {
resourceCalculatorPlugin.getVirtualMemorySize()
- resourceCalculatorPlugin.getAvailableVirtualMemorySize();
float vcores = resourceCalculatorPlugin.getNumVCoresUsed();
+ float disk = resourceCalculatorPlugin.getStorageBytesRead() +
+ resourceCalculatorPlugin.getStorageBytesWritten();
+ float net = resourceCalculatorPlugin.getNetworkBytesRead() +
+ resourceCalculatorPlugin.getNetworkBytesWritten();
nodeUtilization =
ResourceUtilization.newInstance(
(int) (pmem >> 20), // B -> MB
(int) (vmem >> 20), // B -> MB
- vcores); // Used Virtual Cores
+ vcores, // Used Virtual Cores
+ disk / (1024 * 1024), // B/s -> MB/s
+ net / (1024 * 1024)); // B/s -> MB/s
try {
Thread.sleep(monitoringInterval);