commit 6d98412141b354bb9ea465eff8139c727d943db1 Author: Sahil Takiar Date: Thu Feb 8 09:33:41 2018 -0800 HIVE-18034: Improving logging with HoS executors spend lots of time in GC diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 62daaaa610..a22e8290e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,10 +26,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hive.spark.client.MetricsCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -77,7 +80,7 @@ private static final LogHelper console = new LogHelper(LOG); private PerfLogger perfLogger; private static final long serialVersionUID = 1L; - private transient String sparkJobID; + private transient int sparkJobID; private transient SparkStatistics sparkStatistics; private transient long submitTime; private transient long startTime; @@ -123,13 +126,14 @@ public int execute(DriverContext driverContext) { } addToHistory(jobRef); - sparkJobID = jobRef.getJobId(); this.jobID = jobRef.getSparkJobStatus().getAppID(); rc = jobRef.monitorJob(); + sparkJobID = jobRef.getSparkJobStatus().getJobId(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); getSparkJobInfo(sparkJobStatus, rc); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); logSparkStatistic(sparkStatistics); @@ -188,6 +192,32 @@ public int execute(DriverContext driverContext) { return rc; } + + private void printExcessiveGCWarning() { + SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup + (SparkStatisticsNames.getSparkGroupName(sparkJobID)); + if (sparkStatisticGroup != null) { + long taskDurationTime = sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.TASK_DURATION_TIME).getValue(); + long jvmGCTime = sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.JVM_GC_TIME).getValue(); + double threshold = 0.1; + + if (jvmGCTime > taskDurationTime * threshold) { + String gcWarning = "WARNING: Spark Job[" + + sparkJobID + + "] Spent " + + Math.round((double) jvmGCTime / taskDurationTime * 100) + + "% (" + + jvmGCTime + + " ms / " + + taskDurationTime + + " ms) of task time in GC"; + console.printInfo(gcWarning); + } + } + } + private void addToHistory(SparkJobRef jobRef) { console.printInfo("Starting Spark Job = " + jobRef.getJobId()); if (SessionState.get() != null) { @@ -277,7 +307,7 @@ public String getName() { return ((ReduceWork) children.get(0)).getReducer(); } - public String getSparkJobID() { + public int getSparkJobID() { return sparkJobID; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java index a69773034a..555e48556e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java @@ -19,14 +19,14 @@ public class SparkStatistic { private final String name; - private final String value; + private final Long value; - SparkStatistic(String name, String value) { + SparkStatistic(String name, Long value) { this.name = name; this.value = value; } - public String getValue() { + public Long getValue() { return value; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java index 5ab4d16777..4befdb47b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java @@ -18,16 +18,21 @@ package org.apache.hadoop.hive.ql.exec.spark.Statistic; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatisticGroup { private final String groupName; - private final List statisticList; + private final Map statistics = new LinkedHashMap<>(); SparkStatisticGroup(String groupName, List statisticList) { this.groupName = groupName; - this.statisticList = Collections.unmodifiableList(statisticList); + for (SparkStatistic sparkStatistic : statisticList) { + this.statistics.put(sparkStatistic.getName(), sparkStatistic); + } } public String getGroupName() { @@ -35,6 +40,10 @@ public String getGroupName() { } public Iterator getStatistics() { - return this.statisticList.iterator(); + return this.statistics.values().iterator(); + } + + public SparkStatistic getSparkStatistic(String name) { + return this.statistics.get(name); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java index 584e8bf8f6..0745c9d9f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java @@ -19,17 +19,27 @@ import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatistics { - private final List statisticGroups; - SparkStatistics(List statisticGroups) { - this.statisticGroups = Collections.unmodifiableList(statisticGroups); + private final Map statisticGroups = new LinkedHashMap<>(); + + SparkStatistics(List statisticGroupsList) { + for (SparkStatisticGroup group : statisticGroupsList) { + statisticGroups.put(group.getGroupName(), group); + } } public Iterator getStatisticGroups() { - return this.statisticGroups.iterator(); + return this.statisticGroups.values().iterator(); + } + + public SparkStatisticGroup getStatisticGroup(String groupName) { + return this.statisticGroups.get(groupName); } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java index 6ebc274b28..23f5f4bd85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java @@ -55,13 +55,13 @@ public SparkStatisticsBuilder add(SparkCounters sparkCounters) { } for (SparkCounter counter : counterGroup.getSparkCounters().values()) { String displayName = counter.getDisplayName(); - statisticList.add(new SparkStatistic(displayName, Long.toString(counter.getValue()))); + statisticList.add(new SparkStatistic(displayName, counter.getValue())); } } return this; } - public SparkStatisticsBuilder add(String groupName, String name, String value) { + public SparkStatisticsBuilder add(String groupName, String name, Long value) { List statisticList = statisticMap.get(groupName); if (statisticList == null) { statisticList = new LinkedList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java new file mode 100644 index 0000000000..3301ec516e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +public class SparkStatisticsNames { + + public final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + public final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + public final static String RESULT_SIZE = "ResultSize"; + public final static String JVM_GC_TIME = "JvmGCTime"; + public final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + public final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + public final static String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + public final static String BYTES_READ = "BytesRead"; + public final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + public final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + public final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + public final static String FETCH_WAIT_TIME = "FetchWaitTime"; + public final static String REMOTE_BYTES_READ = "RemoteBytesRead"; + public final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + public final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + public final static String TASK_DURATION_TIME = "TaskDurationTime"; + + private final static String SPARK_GROUP_NAME_PREFIX = "Spark job["; + private final static String SPARK_GROUP_NAME_SUFFIX = "]"; + + public static String getSparkGroupName(int sparkJobId) { + return SPARK_GROUP_NAME_PREFIX + Integer.toString(sparkJobId) + SPARK_GROUP_NAME_SUFFIX; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index eaeb4dc5a9..d6a252c675 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.util.AbstractMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.spark.scheduler.TaskInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +40,8 @@ private final Map jobIdToStageId = Maps.newHashMap(); private final Map stageIdToJobId = Maps.newHashMap(); - private final Map>> allJobMetrics = Maps.newHashMap(); + private final Map>>> allJobMetrics = + Maps.newHashMap(); @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { @@ -47,17 +50,12 @@ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (jobId == null) { LOG.warn("Can not find job id for stage[" + stageId + "]."); } else { - Map> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List stageMetrics = jobMetrics.get(stageId); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageId, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); + Map>> jobMetrics = allJobMetrics.computeIfAbsent( + jobId, k -> Maps.newHashMap()); + List> stageMetrics = jobMetrics.computeIfAbsent(stageId, + k -> Lists.newLinkedList()); + + stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo())); } } @@ -74,7 +72,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobIdToStageId.put(jobId, intStageIds); } - public synchronized Map> getJobMetric(int jobId) { + public synchronized Map>> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 8b031e7d26..5dde0ee106 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.spark.scheduler.TaskInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; @@ -130,7 +131,7 @@ public SparkStatistics getSparkStatistics() { sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + Map>> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } @@ -138,16 +139,16 @@ public SparkStatistics getSparkStatistics() { MetricsCollection metricsCollection = new MetricsCollection(); Set stageIds = jobMetric.keySet(); for (int stageId : stageIds) { - List taskMetrics = jobMetric.get(stageId); - for (TaskMetrics taskMetric : taskMetrics) { - Metrics metrics = new Metrics(taskMetric); + List> taskMetrics = jobMetric.get(stageId); + for (Map.Entry taskMetric : taskMetrics) { + Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue()); metricsCollection.addMetrics(jobId, stageId, 0, metrics); } } Map flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection .getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), entry.getValue()); } return sparkStatisticsBuilder.build(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e950452c31..a854302366 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.metrics.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -44,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -125,16 +129,18 @@ public SparkStatistics getSparkStatistics() { if (metricsCollection == null || getCounter() == null) { return null; } + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; + // add spark job metrics. - e.g. metrics collected by Spark itself Map flatJobMetric = SparkMetricsUtils.collectMetrics( metricsCollection.getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(SparkStatisticsNames.getSparkGroupName(getJobId()), entry.getKey(), + entry.getValue()); } return sparkStatisticsBuilder.build(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index dd171683d1..f72407e512 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -20,54 +20,40 @@ import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; final class SparkMetricsUtils { - private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; - private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; - private final static String RESULT_SIZE = "ResultSize"; - private final static String JVM_GC_TIME = "JvmGCTime"; - private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; - private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled"; - private final static String BYTES_READ = "BytesRead"; - private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - private final static String FETCH_WAIT_TIME = "FetchWaitTime"; - private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; - private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; - private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - private SparkMetricsUtils(){} static Map collectMetrics(Metrics allMetrics) { Map results = new LinkedHashMap(); - results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); - results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(RESULT_SIZE, allMetrics.resultSize); - results.put(JVM_GC_TIME, allMetrics.jvmGCTime); - results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); - results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); + results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { - results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); + results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead); } if (allMetrics.shuffleReadMetrics != null) { ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; long rbf = shuffleReadMetrics.remoteBlocksFetched; long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(REMOTE_BLOCKS_FETCHED, rbf); - results.put(LOCAL_BLOCKS_FETCHED, lbf); - results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf); + results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf); + results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); + results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); } if (allMetrics.shuffleWriteMetrics != null) { - results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); + results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } return results; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 0f03a64063..25617ba342 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -25,7 +25,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; @@ -148,6 +147,7 @@ private Metrics aggregate(Predicate filter) { long resultSerializationTime = 0L; long memoryBytesSpilled = 0L; long diskBytesSpilled = 0L; + long taskDurationTime = 0L; // Input metrics. boolean hasInputMetrics = false; @@ -173,6 +173,7 @@ private Metrics aggregate(Predicate filter) { resultSerializationTime += m.resultSerializationTime; memoryBytesSpilled += m.memoryBytesSpilled; diskBytesSpilled += m.diskBytesSpilled; + taskDurationTime += m.taskDurationTime; if (m.inputMetrics != null) { hasInputMetrics = true; @@ -224,7 +225,8 @@ private Metrics aggregate(Predicate filter) { diskBytesSpilled, inputMetrics, shuffleReadMetrics, - shuffleWriteMetrics); + shuffleWriteMetrics, + taskDurationTime); } finally { lock.readLock().unlock(); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e584cbb0a7..f221d0a878 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -478,7 +478,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { public void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (taskEnd.reason() instanceof org.apache.spark.Success$ && !taskEnd.taskInfo().speculative()) { - Metrics metrics = new Metrics(taskEnd.taskMetrics()); + Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo()); Integer jobId; synchronized (stageToJobId) { jobId = stageToJobId.get(taskEnd.stageId()); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 418d53483f..fc9c24a153 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.spark.scheduler.TaskInfo; /** * Metrics tracked during the execution of a job. @@ -46,6 +47,8 @@ public final long memoryBytesSpilled; /** The number of on-disk bytes spilled by tasks. */ public final long diskBytesSpilled; + /** Amount of time spent executing tasks. */ + public final long taskDurationTime; /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */ public final InputMetrics inputMetrics; /** @@ -58,7 +61,7 @@ private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, 0L); } public Metrics( @@ -71,7 +74,8 @@ public Metrics( long diskBytesSpilled, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, - ShuffleWriteMetrics shuffleWriteMetrics) { + ShuffleWriteMetrics shuffleWriteMetrics, + long taskDurationTime) { this.executorDeserializeTime = executorDeserializeTime; this.executorRunTime = executorRunTime; this.resultSize = resultSize; @@ -82,9 +86,10 @@ public Metrics( this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; + this.taskDurationTime = taskDurationTime; } - public Metrics(TaskMetrics metrics) { + public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), metrics.executorRunTime(), @@ -95,7 +100,8 @@ public Metrics(TaskMetrics metrics) { metrics.diskBytesSpilled(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), - optionalShuffleWriteMetrics(metrics)); + optionalShuffleWriteMetrics(metrics), + taskInfo.duration()); } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 8fef66b238..535e0cd2de 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -67,7 +67,7 @@ public void testMetricsAggregation() { public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); Metrics metrics = new Metrics(value, value, value, value, value, value, value, - null, null, null); + null, null, null, value); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -95,9 +95,9 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + new InputMetrics(value), null, null, value); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + new InputMetrics(value), null, null, value); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -111,7 +111,8 @@ private Metrics makeMetrics(int jobId, int stageId, long taskId) { return new Metrics(value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), - new ShuffleWriteMetrics(value, value)); + new ShuffleWriteMetrics(value, value), + value); } /** @@ -164,6 +165,8 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); + + assertEquals(expected, metrics.taskDurationTime); } }