diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 7ab9a34..46b04bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -104,13 +105,16 @@ public int execute(DriverContext driverContext) { } SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); - sparkCounters = jobRef.getSparkJobStatus().getCounter(); - SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus()); + SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); + sparkCounters = sparkJobStatus.getCounter(); + SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); monitor.startMonitor(); - SparkStatistics sparkStatistics = jobRef.getSparkJobStatus().getSparkStatistics(); + SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { + LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId())); logSparkStatistic(sparkStatistics); } + sparkJobStatus.cleanup(); rc = 0; } catch (Exception e) { LOG.error("Failed to execute spark task.", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index f6cc581..bbc9fc3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -39,4 +39,5 @@ public SparkStatistics getSparkStatistics(); + public void cleanup(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java index b4f753f..e607095 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.JobSucceeded; import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerApplicationEnd; @@ -39,12 +44,14 @@ import org.apache.spark.scheduler.SparkListenerTaskStart; import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import scala.collection.JavaConversions; - public class JobStateListener implements SparkListener { - private Map jobIdToStates = new HashMap(); - private Map jobIdToStageId = new HashMap(); + private final static Log LOG = LogFactory.getLog(JobStateListener.class); + + private final Map jobIdToStates = Maps.newHashMap(); + private final Map jobIdToStageId = Maps.newHashMap(); + private final Map stageIdToJobId = Maps.newHashMap(); + private final Map>> allJobMetrics = Maps.newHashMap(); @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { @@ -67,19 +74,40 @@ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult } @Override - public void onTaskEnd(SparkListenerTaskEnd taskEnd) { - + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { + int stageId = taskEnd.stageId(); + int stageAttemptId = taskEnd.stageAttemptId(); + String stageIdentifier = stageId + "_" + stageAttemptId; + Integer jobId = stageIdToJobId.get(stageId); + if (jobId == null) { + LOG.warn("Can not find job id for stage[" + stageId + "]."); + } else { + Map> jobMetrics = allJobMetrics.get(jobId); + if (jobMetrics == null) { + jobMetrics = Maps.newHashMap(); + allJobMetrics.put(jobId, jobMetrics); + } + List stageMetrics = jobMetrics.get(stageIdentifier); + if (stageMetrics == null) { + stageMetrics = Lists.newLinkedList(); + jobMetrics.put(stageIdentifier, stageMetrics); + } + stageMetrics.add(taskEnd.taskMetrics()); + } } @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { - jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING); - List ids = JavaConversions.asJavaList(jobStart.stageIds()); - int[] intStageIds = new int[ids.size()]; - for(int i=0; i> getJobMetric(int jobId) { + return allJobMetrics.get(jobId); + } + + public synchronized void cleanup(int jobId) { + allJobMetrics.remove(jobId); + jobIdToStates.remove(jobId); + jobIdToStageId.remove(jobId); + Iterator> iterator = stageIdToJobId.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue() == jobId) { + iterator.remove(); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index 78e16c5..31a45d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -22,12 +22,17 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Maps; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.spark.executor.InputMetrics; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.spark.ui.jobs.UIData; @@ -124,7 +129,98 @@ public SparkCounters getCounter() { @Override public SparkStatistics getSparkStatistics() { - return new SparkStatisticsBuilder().add(sparkCounters).build(); + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. + sparkStatisticsBuilder.add(sparkCounters); + // add spark job metrics. + String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; + Map> jobMetric = jobStateListener.getJobMetric(jobId); + Map flatJobMetric = combineJobLevelMetrics(jobMetric); + for (Map.Entry entry : flatJobMetric.entrySet()) { + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + } + + return sparkStatisticsBuilder.build(); + } + + @Override + public void cleanup() { + jobStateListener.cleanup(jobId); + } + + private Map combineJobLevelMetrics(Map> jobMetric) { + Map results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List stageMetric : jobMetric.values()) { + for (TaskMetrics taskMetrics : stageMetric) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } + } + + results.put("EexcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + } + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + return results; } private List getStageInfo(int stageId) {