diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index ebc5c16..b77f8f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -129,8 +129,8 @@ public SparkStatistics getSparkStatistics() { if (jobMetric == null) { return null; } - - Map flatJobMetric = combineJobLevelMetrics(jobMetric); + SparkJobMetrics sparkJobMetrics = new SparkJobMetrics(); + Map flatJobMetric = sparkJobMetrics.getMetrics(jobMetric); for (Map.Entry entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); } @@ -148,85 +148,6 @@ public void cleanup() { } } - private Map combineJobLevelMetrics(Map> jobMetric) { - Map results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - } - } - } - } - - results.put("ExecutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - } - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - return results; - } - private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e8d581f..b4de6af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -114,7 +114,8 @@ public SparkStatistics getSparkStatistics() { // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - Map flatJobMetric = extractMetrics(metricsCollection); + SparkJobMetrics sparkJobMetrics = new SparkJobMetrics(); + Map flatJobMetric = sparkJobMetrics.getMetrics(metricsCollection); for (Map.Entry entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); } @@ -216,38 +217,6 @@ public SparkStageInfo call(JobContext jc) throws Exception { } } - private Map extractMetrics(MetricsCollection metricsCollection) { - Map results = new LinkedHashMap(); - Metrics allMetrics = metricsCollection.getAllMetrics(); - - results.put("ExecutorDeserializeTime", allMetrics.executorDeserializeTime); - results.put("ExecutorRunTime", allMetrics.executorRunTime); - results.put("ResultSize", allMetrics.resultSize); - results.put("JvmGCTime", allMetrics.jvmGCTime); - results.put("ResultSerializationTime", allMetrics.resultSerializationTime); - results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled); - results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled); - if (allMetrics.inputMetrics != null) { - results.put("BytesRead", allMetrics.inputMetrics.bytesRead); - } - if (allMetrics.shuffleReadMetrics != null) { - ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; - long rbf = shuffleReadMetrics.remoteBlocksFetched; - long lbf = shuffleReadMetrics.localBlocksFetched; - results.put("RemoteBlocksFetched", rbf); - results.put("LocalBlocksFetched", lbf); - results.put("TotalBlocksFetched", lbf + rbf); - results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime); - results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead); - } - if (allMetrics.shuffleWriteMetrics != null) { - results.put("ShuffleBytesWritten", allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put("ShuffleWriteTime", allMetrics.shuffleWriteMetrics.shuffleWriteTime); - } - - return results; - } - private static SparkJobInfo getDefaultJobInfo(final Integer jobId, final JobExecutionStatus status) { return new SparkJobInfo() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobMetrics.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobMetrics.java new file mode 100644 index 0000000..ddb081b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobMetrics.java @@ -0,0 +1,173 @@ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hive.spark.client.MetricsCollection; +import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import scala.Option; + +public class SparkJobMetrics { + + private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + private final static String RESULT_SIZE = "ResultSize"; + private final static String JVM_GC_TIME = "JvmGCTime"; + private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + private final static String DISK_BYTES_SPLIED = "DiskBytesSpilled"; + private final static String BYTES_READ = "BytesRead"; + private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + private final static String FETCH_WAIT_TIME = "FetchWaitTime"; + private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; + private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + + private Map results = new LinkedHashMap(); + private long executorDeserializeTime = 0; + private long executorRunTime = 0; + private long resultSize = 0; + private long jvmGCTime = 0; + private long resultSerializationTime = 0; + private long memoryBytesSpilled = 0; + private long diskBytesSpilled = 0; + private long bytesRead = 0; + private long remoteBlocksFetched = 0; + private long localBlocksFetched = 0; + private long TotalBlocksFetched = 0; + private long fetchWaitTime = 0; + private long remoteBytesRead = 0; + private long shuffleBytesWritten = 0; + private long shuffleWriteTime = 0; + private boolean inputMetricExist = false; + private boolean shuffleReadMetricExist = false; + private boolean shuffleWriteMetricExist = false; + + public Map getMetrics(MetricsCollection metricsCollection) { + cleanUp(); + Metrics allMetrics = metricsCollection.getAllMetrics(); + executorDeserializeTime = allMetrics.executorDeserializeTime; + executorRunTime = allMetrics.executorRunTime; + resultSize = allMetrics.resultSize; + jvmGCTime = allMetrics.jvmGCTime; + resultSerializationTime = allMetrics.resultSerializationTime; + memoryBytesSpilled = allMetrics.memoryBytesSpilled; + diskBytesSpilled = allMetrics.diskBytesSpilled; + if (allMetrics.inputMetrics != null) { + inputMetricExist = true; + bytesRead = allMetrics.inputMetrics.bytesRead; + } + if (allMetrics.shuffleReadMetrics != null) { + shuffleReadMetricExist = true; + org.apache.hive.spark.client.metrics.ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; + long rbf = shuffleReadMetrics.remoteBlocksFetched; + long lbf = shuffleReadMetrics.localBlocksFetched; + remoteBlocksFetched = rbf; + localBlocksFetched = lbf; + TotalBlocksFetched = lbf + rbf; + fetchWaitTime = shuffleReadMetrics.fetchWaitTime; + remoteBytesRead = shuffleReadMetrics.remoteBytesRead; + } + if (allMetrics.shuffleWriteMetrics != null) { + shuffleWriteMetricExist = true; + shuffleBytesWritten = allMetrics.shuffleWriteMetrics.shuffleBytesWritten; + shuffleWriteTime = allMetrics.shuffleWriteMetrics.shuffleWriteTime; + } + collectMetrics(); + return results; + } + + public Map getMetrics(Map> jobMetric) { + cleanUp(); + for (List stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics + .shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } + } + } + } + collectMetrics(); + return results; + } + + private Map collectMetrics() { + results.put(EXECUTOR_DESERIALIZE_TIME, executorDeserializeTime); + results.put(EXECUTOR_RUN_TIME, executorRunTime); + results.put(RESULT_SIZE, resultSize); + results.put(JVM_GC_TIME, jvmGCTime); + results.put(RESULT_SERIALIZATION_TIME, resultSerializationTime); + results.put(MEMORY_BYTES_SPLIED, memoryBytesSpilled); + results.put(DISK_BYTES_SPLIED, diskBytesSpilled); + if (inputMetricExist) { + results.put(BYTES_READ, bytesRead); + } + if (shuffleReadMetricExist) { + results.put(REMOTE_BLOCKS_FETCHED, remoteBlocksFetched); + results.put(LOCAL_BLOCKS_FETCHED, localBlocksFetched); + results.put(TOTAL_BLOCKS_FETCHED, TotalBlocksFetched); + results.put(FETCH_WAIT_TIME, fetchWaitTime); + results.put(REMOTE_BYTES_READ, remoteBytesRead); + } + if (shuffleWriteMetricExist) { + results.put(SHUFFLE_BYTES_WRITTEN, shuffleBytesWritten); + results.put(SHUFFLE_WRITE_TIME, shuffleWriteTime); + } + return results; + } + + private void cleanUp() { + results = new LinkedHashMap(); + executorDeserializeTime = 0; + executorRunTime = 0; + resultSize = 0; + jvmGCTime = 0; + resultSerializationTime = 0; + memoryBytesSpilled = 0; + diskBytesSpilled = 0; + bytesRead = 0; + remoteBlocksFetched = 0; + localBlocksFetched = 0; + TotalBlocksFetched = 0; + fetchWaitTime = 0; + remoteBytesRead = 0; + shuffleBytesWritten = 0; + shuffleWriteTime = 0; + inputMetricExist = false; + shuffleReadMetricExist = false; + shuffleWriteMetricExist = false; + } +}