diff --git spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 236eaec..52428f0 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -26,7 +26,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; @@ -178,53 +177,50 @@ private Metrics aggregate(Predicate filter) { memoryBytesSpilled += m.memoryBytesSpilled; diskBytesSpilled += m.diskBytesSpilled; - if (m.inputMetrics.isPresent()) { + if (m.inputMetrics != null) { hasInputMetrics = true; - InputMetrics im = m.inputMetrics.get(); if (readMethod == null) { - readMethod = im.readMethod; - } else if (readMethod != im.readMethod) { + readMethod = m.inputMetrics.readMethod; + } else if (readMethod != m.inputMetrics.readMethod) { readMethod = DataReadMethod.Multiple; } - bytesRead += im.bytesRead; + bytesRead += m.inputMetrics.bytesRead; } - if (m.shuffleReadMetrics.isPresent()) { - ShuffleReadMetrics srm = m.shuffleReadMetrics.get(); + if (m.shuffleReadMetrics != null) { hasShuffleReadMetrics = true; - remoteBlocksFetched += srm.remoteBlocksFetched; - localBlocksFetched += srm.localBlocksFetched; - fetchWaitTime += srm.fetchWaitTime; - remoteBytesRead += srm.remoteBytesRead; + remoteBlocksFetched += m.shuffleReadMetrics.remoteBlocksFetched; + localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched; + fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime; + remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead; } - if (m.shuffleWriteMetrics.isPresent()) { - ShuffleWriteMetrics swm = m.shuffleWriteMetrics.get(); + if (m.shuffleWriteMetrics != null) { hasShuffleWriteMetrics = true; - shuffleBytesWritten += swm.shuffleBytesWritten; - shuffleWriteTime += swm.shuffleWriteTime; + shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten; + shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime; } } - Optional inputMetrics = Optional.absent(); + InputMetrics inputMetrics = null; if (hasInputMetrics) { - inputMetrics = Optional.of(new InputMetrics(readMethod, bytesRead)); + inputMetrics = new InputMetrics(readMethod, bytesRead); } - Optional shuffleReadMetrics = Optional.absent(); + ShuffleReadMetrics shuffleReadMetrics = null; if (hasShuffleReadMetrics) { - shuffleReadMetrics = Optional.of(new ShuffleReadMetrics( + shuffleReadMetrics = new ShuffleReadMetrics( remoteBlocksFetched, localBlocksFetched, fetchWaitTime, - remoteBytesRead)); + remoteBytesRead); } - Optional shuffleWriteMetrics = Optional.absent(); + ShuffleWriteMetrics shuffleWriteMetrics = null; if (hasShuffleReadMetrics) { - shuffleWriteMetrics = Optional.of(new ShuffleWriteMetrics( + shuffleWriteMetrics = new ShuffleWriteMetrics( shuffleBytesWritten, - shuffleWriteTime)); + shuffleWriteTime); } return new Metrics( diff --git spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 11f7151..87a20fc 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -19,7 +19,6 @@ import java.io.Serializable; -import com.google.common.base.Optional; import org.apache.spark.executor.TaskMetrics; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -48,14 +47,14 @@ /** The number of on-disk bytes spilled by tasks. */ public final long diskBytesSpilled; /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */ - public final Optional inputMetrics; + public final InputMetrics inputMetrics; /** * If tasks read from shuffle output, metrics on getting shuffle data. This includes read metrics * aggregated over all the tasks' shuffle dependencies. */ - public final Optional shuffleReadMetrics; + public final ShuffleReadMetrics shuffleReadMetrics; /** If tasks wrote to shuffle output, metrics on the written shuffle data. */ - public final Optional shuffleWriteMetrics; + public final ShuffleWriteMetrics shuffleWriteMetrics; public Metrics( long executorDeserializeTime, @@ -65,9 +64,9 @@ public Metrics( long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, - Optional inputMetrics, - Optional shuffleReadMetrics, - Optional shuffleWriteMetrics) { + InputMetrics inputMetrics, + ShuffleReadMetrics shuffleReadMetrics, + ShuffleWriteMetrics shuffleWriteMetrics) { this.executorDeserializeTime = executorDeserializeTime; this.executorRunTime = executorRunTime; this.resultSize = resultSize; @@ -94,24 +93,16 @@ public Metrics(TaskMetrics metrics) { optionalShuffleWriteMetrics(metrics)); } - private static final Optional optionalInputMetric(TaskMetrics metrics) { - return metrics.inputMetrics().isDefined() - ? Optional.of(new InputMetrics(metrics)) - : Optional.absent(); + private static final InputMetrics optionalInputMetric(TaskMetrics metrics) { + return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null; } - private static final Optional - optionalShuffleReadMetric(TaskMetrics metrics) { - return metrics.shuffleReadMetrics().isDefined() - ? Optional.of(new ShuffleReadMetrics(metrics)) - : Optional.absent(); + private static final ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) { + return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null; } - private static final Optional - optionalShuffleWriteMetrics(TaskMetrics metrics) { - return metrics.shuffleWriteMetrics().isDefined() - ? Optional.of(new ShuffleWriteMetrics(metrics)) - : Optional.absent(); + private static final ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) { + return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null; } } diff --git spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index be6f68d..f87fc28 100644 --- spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -19,7 +19,6 @@ import java.util.Arrays; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.junit.Test; @@ -62,9 +61,7 @@ public void testMetricsAggregation() { public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); Metrics metrics = new Metrics(value, value, value, value, value, value, value, - Optional.absent(), - Optional.absent(), - Optional.absent()); + null, null, null); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -72,18 +69,18 @@ public void testOptionalMetrics() { } Metrics global = collection.getAllMetrics(); - assertFalse(global.inputMetrics.isPresent()); - assertFalse(global.shuffleReadMetrics.isPresent()); - assertFalse(global.shuffleWriteMetrics.isPresent()); + assertNull(global.inputMetrics); + assertNull(global.shuffleReadMetrics); + assertNull(global.shuffleWriteMetrics); collection.addMetrics(3, 1, 1, makeMetrics(3, 1, 1)); Metrics global2 = collection.getAllMetrics(); - assertTrue(global2.inputMetrics.isPresent()); - assertEquals(taskValue(3, 1, 1), global2.inputMetrics.get().bytesRead); + assertNotNull(global2.inputMetrics); + assertEquals(taskValue(3, 1, 1), global2.inputMetrics.bytesRead); - assertTrue(global2.shuffleReadMetrics.isPresent()); - assertTrue(global2.shuffleWriteMetrics.isPresent()); + assertNotNull(global2.shuffleReadMetrics); + assertNotNull(global2.shuffleWriteMetrics); } @Test @@ -92,28 +89,24 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)), - Optional.absent(), - Optional.absent()); + new InputMetrics(DataReadMethod.Memory, value), null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - Optional.fromNullable(new InputMetrics(DataReadMethod.Disk, value)), - Optional.absent(), - Optional.absent()); + new InputMetrics(DataReadMethod.Disk, value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); Metrics global = collection.getAllMetrics(); - assertTrue(global.inputMetrics.isPresent()); - assertEquals(DataReadMethod.Multiple, global.inputMetrics.get().readMethod); + assertNotNull(global.inputMetrics); + assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod); } private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; return new Metrics(value, value, value, value, value, value, value, - Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)), - Optional.fromNullable(new ShuffleReadMetrics((int) value, (int) value, value, value)), - Optional.fromNullable(new ShuffleWriteMetrics(value, value))); + new InputMetrics(DataReadMethod.Memory, value), + new ShuffleReadMetrics((int) value, (int) value, value, value), + new ShuffleWriteMetrics(value, value)); } /** @@ -157,19 +150,16 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.memoryBytesSpilled); assertEquals(expected, metrics.diskBytesSpilled); - InputMetrics im = metrics.inputMetrics.get(); - assertEquals(DataReadMethod.Memory, im.readMethod); - assertEquals(expected, im.bytesRead); + assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod); + assertEquals(expected, metrics.inputMetrics.bytesRead); - ShuffleReadMetrics srm = metrics.shuffleReadMetrics.get(); - assertEquals(expected, srm.remoteBlocksFetched); - assertEquals(expected, srm.localBlocksFetched); - assertEquals(expected, srm.fetchWaitTime); - assertEquals(expected, srm.remoteBytesRead); + assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched); + assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched); + assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime); + assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead); - ShuffleWriteMetrics swm = metrics.shuffleWriteMetrics.get(); - assertEquals(expected, swm.shuffleBytesWritten); - assertEquals(expected, swm.shuffleWriteTime); + assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); + assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); } }