commit 68275d76c8910fb901b2818367097201743d7704 Author: Sahil Takiar Date: Wed Apr 11 16:22:52 2018 -0700 HIVE-18652: Print Spark metrics on console diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 803877162d..81cd06b4df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,6 +162,7 @@ public int execute(DriverContext driverContext) { if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + printConsoleMetrics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); @@ -222,6 +224,77 @@ public int execute(DriverContext driverContext) { return rc; } + private void printConsoleMetrics() { + SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( + SparkStatisticsNames.SPARK_GROUP_NAME); + + if (sparkStatisticGroup != null) { + String colon = ": "; + String forwardSlash = " / "; + String separator = ", "; + + String metricsString = String.format("Spark Job[%d] Metrics: ", sparkJobID) + + + // Task Duration Time + SparkStatisticsNames.TASK_DURATION_TIME + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.TASK_DURATION_TIME) + + separator + + + // Executor CPU Time + SparkStatisticsNames.EXECUTOR_CPU_TIME + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.EXECUTOR_CPU_TIME) + + separator + + + // JCM GC Time + SparkStatisticsNames.JVM_GC_TIME + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.JVM_GC_TIME) + + separator + + + // Bytes Read / Records Read + SparkStatisticsNames.BYTES_READ + + forwardSlash + + SparkStatisticsNames.RECORDS_READ + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.BYTES_READ) + + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.RECORDS_READ) + + separator + + + // Shuffle Read Bytes / Shuffle Read Records + SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ + + forwardSlash + + SparkStatisticsNames.SHUFFLE_RECORDS_READ + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ) + + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_RECORDS_READ) + + separator + + + // Shuffle Write Bytes / Shuffle Write Records + SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN + + forwardSlash + + SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN + + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN) + + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN); + + console.printInfo(metricsString); + } + } + /** * Use the Spark metrics and calculate how much task executione time was spent performing GC * operations. If more than a defined threshold of time is spent, print out a warning on the @@ -231,10 +304,10 @@ private void printExcessiveGCWarning() { SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( SparkStatisticsNames.SPARK_GROUP_NAME); if (sparkStatisticGroup != null) { - long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( - SparkStatisticsNames.TASK_DURATION_TIME).getValue()); - long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( - SparkStatisticsNames.JVM_GC_TIME).getValue()); + long taskDurationTime = SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.TASK_DURATION_TIME); + long jvmGCTime = SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.JVM_GC_TIME); // Threshold percentage to trigger the GC warning double threshold = 0.1; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index 68e4f9e456..12c3eac789 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -31,15 +31,28 @@ public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled"; public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + + public static final String TASK_DURATION_TIME = "TaskDurationTime"; + + // Input Metrics public static final String BYTES_READ = "BytesRead"; - public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - public static final String FETCH_WAIT_TIME = "FetchWaitTime"; - public static final String REMOTE_BYTES_READ = "RemoteBytesRead"; + public static final String RECORDS_READ = "RecordsRead"; + + // Shuffle Read Metrics + public static final String SHUFFLE_FETCH_WAIT_TIME = "ShuffleFetchWaitTime"; + public static final String SHUFFLE_REMOTE_BYTES_READ = "ShuffleRemoteBytesRead"; + public static final String SHUFFLE_LOCAL_BYTES_READ = "ShuffleLocalBytesRead"; + public static final String SHUFFLE_TOTAL_BYTES_READ = "ShuffleTotalBytesRead"; + public static final String SHUFFLE_REMOTE_BLOCKS_FETCHED = "ShuffleRemoteBlocksFetched"; + public static final String SHUFFLE_LOCAL_BLOCKS_FETCHED = "ShuffleLocalBlocksFetched"; + public static final String SHUFFLE_TOTAL_BLOCKS_FETCHED = "ShuffleTotalBlocksFetched"; + public static final String SHUFFLE_REMOTE_BYTES_READ_TO_DISK = "ShuffleRemoteBytesReadToDisk"; + public static final String SHUFFLE_RECORDS_READ = "ShuffleRecordsRead"; + + // Shuffle Write Metrics public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - public static final String TASK_DURATION_TIME = "TaskDurationTime"; + public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten"; public static final String SPARK_GROUP_NAME = "SPARK"; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index fab5422f1f..c73c1505ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -20,45 +20,58 @@ import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; -final class SparkMetricsUtils { +public final class SparkMetricsUtils { private SparkMetricsUtils(){} static Map collectMetrics(Metrics allMetrics) { Map results = new LinkedHashMap(); + results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); + results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); + results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME, allMetrics.executorDeserializeCpuTime); - results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); - results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); - results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); - results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead); + results.put(SparkStatisticsNames.RECORDS_READ, allMetrics.inputMetrics.recordsRead); } if (allMetrics.shuffleReadMetrics != null) { ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; long rbf = shuffleReadMetrics.remoteBlocksFetched; long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf); - results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf); - results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ, + shuffleReadMetrics.remoteBytesRead + shuffleReadMetrics.localBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BYTES_READ, shuffleReadMetrics.localBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ_TO_DISK, shuffleReadMetrics + .remoteBytesReadToDisk); + results.put(SparkStatisticsNames.SHUFFLE_RECORDS_READ, shuffleReadMetrics.recordsRead); + results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BLOCKS_FETCHED, rbf); + results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BLOCKS_FETCHED, lbf); + results.put(SparkStatisticsNames.SHUFFLE_FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); } if (allMetrics.shuffleWriteMetrics != null) { results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN, + allMetrics.shuffleWriteMetrics.shuffleRecordsWritten); results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } return results; } + public static long getSparkStatisticAsLong(SparkStatisticGroup group, String name) { + return Long.parseLong(group.getSparkStatistic(name).getValue()); + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 2f3c026212..a0db015ae7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -154,6 +154,7 @@ private Metrics aggregate(Predicate filter) { // Input metrics. boolean hasInputMetrics = false; long bytesRead = 0L; + long recordsRead = 0L; // Shuffle read metrics. boolean hasShuffleReadMetrics = false; @@ -161,10 +162,14 @@ private Metrics aggregate(Predicate filter) { int localBlocksFetched = 0; long fetchWaitTime = 0L; long remoteBytesRead = 0L; + long localBytesRead = 0L; + long remoteBytesReadToDisk = 0L; + long shuffleRecordsRead = 0L; // Shuffle write metrics. long shuffleBytesWritten = 0L; long shuffleWriteTime = 0L; + long shuffleRecordsWritten = 0L; for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; @@ -182,6 +187,7 @@ private Metrics aggregate(Predicate filter) { if (m.inputMetrics != null) { hasInputMetrics = true; bytesRead += m.inputMetrics.bytesRead; + recordsRead += m.inputMetrics.recordsRead; } if (m.shuffleReadMetrics != null) { @@ -190,17 +196,21 @@ private Metrics aggregate(Predicate filter) { localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched; fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime; remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead; + localBytesRead += m.shuffleReadMetrics.localBytesRead; + remoteBytesReadToDisk += m.shuffleReadMetrics.remoteBytesReadToDisk; + shuffleRecordsRead += m.shuffleReadMetrics.recordsRead; } if (m.shuffleWriteMetrics != null) { shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten; shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime; + shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten; } } InputMetrics inputMetrics = null; if (hasInputMetrics) { - inputMetrics = new InputMetrics(bytesRead); + inputMetrics = new InputMetrics(bytesRead, recordsRead); } ShuffleReadMetrics shuffleReadMetrics = null; @@ -209,14 +219,18 @@ private Metrics aggregate(Predicate filter) { remoteBlocksFetched, localBlocksFetched, fetchWaitTime, - remoteBytesRead); + remoteBytesRead, + localBytesRead, + remoteBytesReadToDisk, + shuffleRecordsRead); } ShuffleWriteMetrics shuffleWriteMetrics = null; if (hasShuffleReadMetrics) { shuffleWriteMetrics = new ShuffleWriteMetrics( shuffleBytesWritten, - shuffleWriteTime); + shuffleWriteTime, + shuffleRecordsWritten); } return new Metrics( diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index 6a13071345..a162f48431 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -28,20 +28,26 @@ */ @InterfaceAudience.Private public class InputMetrics implements Serializable { + /** Total number of bytes read. */ public final long bytesRead; + /** Total number of records read. */ + public final long recordsRead; private InputMetrics() { // For Serialization only. - this(0L); + this(0L, 0L); } public InputMetrics( - long bytesRead) { + long bytesRead, + long recordsRead) { this.bytesRead = bytesRead; + this.recordsRead = recordsRead; } public InputMetrics(TaskMetrics metrics) { - this(metrics.inputMetrics().bytesRead()); + this(metrics.inputMetrics().bytesRead(), + metrics.inputMetrics().recordsRead()); } @Override diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index e3d564f576..ec7113694e 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -42,28 +42,43 @@ public final long fetchWaitTime; /** Total number of remote bytes read from the shuffle by tasks. */ public final long remoteBytesRead; + /** Shuffle data that was read from the local disk (as opposed to from a remote executor). */ + public final long localBytesRead; + /** Total number of remotes bytes read to disk from the shuffle by this task. */ + public final long remoteBytesReadToDisk; + /** Total number of records read from the shuffle by this task. */ + public final long recordsRead; private ShuffleReadMetrics() { // For Serialization only. - this(0, 0, 0L, 0L); + this(0, 0, 0L, 0L, 0L, 0L, 0L); } public ShuffleReadMetrics( long remoteBlocksFetched, long localBlocksFetched, long fetchWaitTime, - long remoteBytesRead) { + long remoteBytesRead, + long localBytesRead, + long remoteBytesReadToDisk, + long recordsRead) { this.remoteBlocksFetched = remoteBlocksFetched; this.localBlocksFetched = localBlocksFetched; this.fetchWaitTime = fetchWaitTime; this.remoteBytesRead = remoteBytesRead; + this.localBytesRead = localBytesRead; + this.remoteBytesReadToDisk = remoteBytesReadToDisk; + this.recordsRead = recordsRead; } public ShuffleReadMetrics(TaskMetrics metrics) { this(metrics.shuffleReadMetrics().remoteBlocksFetched(), metrics.shuffleReadMetrics().localBlocksFetched(), metrics.shuffleReadMetrics().fetchWaitTime(), - metrics.shuffleReadMetrics().remoteBytesRead()); + metrics.shuffleReadMetrics().remoteBytesRead(), + metrics.shuffleReadMetrics().localBytesRead(), + metrics.shuffleReadMetrics().remoteBytesReadToDisk(), + metrics.shuffleReadMetrics().recordsRead()); } /** diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index e9cf6a1d9a..781bf53ebb 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -33,22 +33,27 @@ public final long shuffleBytesWritten; /** Time tasks spent blocking on writes to disk or buffer cache, in nanoseconds. */ public final long shuffleWriteTime; + /** Total number of records written to the shuffle by this task. */ + public final long shuffleRecordsWritten; private ShuffleWriteMetrics() { // For Serialization only. - this(0L, 0L); + this(0L, 0L, 0L); } public ShuffleWriteMetrics( long shuffleBytesWritten, - long shuffleWriteTime) { + long shuffleWriteTime, + long shuffleRecordsWritten) { this.shuffleBytesWritten = shuffleBytesWritten; this.shuffleWriteTime = shuffleWriteTime; + this.shuffleRecordsWritten = shuffleRecordsWritten; } public ShuffleWriteMetrics(TaskMetrics metrics) { this(metrics.shuffleWriteMetrics().shuffleBytesWritten(), - metrics.shuffleWriteMetrics().shuffleWriteTime()); + metrics.shuffleWriteMetrics().shuffleWriteTime(), + metrics.shuffleWriteMetrics().shuffleRecordsWritten()); } @Override diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index c5884cf06d..2d4c43dba8 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -96,9 +96,9 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value, value), null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value, value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -110,9 +110,9 @@ public void testInputReadMethodAggregation() { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; return new Metrics(value, value, value, value, value, value, value, value, value, value, - new InputMetrics(value), - new ShuffleReadMetrics((int) value, (int) value, value, value), - new ShuffleWriteMetrics(value, value)); + new InputMetrics(value, value), + new ShuffleReadMetrics((int) value, (int) value, value, value, value, value, value), + new ShuffleWriteMetrics(value, value, value)); } /** @@ -160,14 +160,19 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.taskDurationTime); assertEquals(expected, metrics.inputMetrics.bytesRead); + assertEquals(expected, metrics.inputMetrics.recordsRead); assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched); assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched); assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime); assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead); + assertEquals(expected, metrics.shuffleReadMetrics.localBytesRead); + assertEquals(expected, metrics.shuffleReadMetrics.recordsRead); + assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesReadToDisk); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); + assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten); } }