commit c6c3cd023e73f5c5a3699d5bf643df089eb722ac Author: Sahil Takiar Date: Thu Feb 8 09:33:41 2018 -0800 HIVE-18034: Improving logging with HoS executors spend lots of time in GC diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 76f6ecc2b4..1f40c98a58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -30,6 +30,7 @@ import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -77,7 +78,10 @@ private static final LogHelper console = new LogHelper(LOG); private PerfLogger perfLogger; private static final long serialVersionUID = 1L; + // The id of the actual Spark job private transient int sparkJobID; + // The id of the JobHandle used to track the actual Spark job + private transient String sparkJobHandleId; private transient SparkStatistics sparkStatistics; private transient long submitTime; private transient long startTime; @@ -111,25 +115,47 @@ public int execute(DriverContext driverContext) { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); + // Submit the Spark job perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + // If the driver context has been shutdown (due to query cancellation) kill the Spark job if (driverContext.isShutdown()) { LOG.warn("Killing Spark job"); killJob(); throw new HiveException("Operation is cancelled."); } - addToHistory(jobRef); - this.jobID = jobRef.getSparkJobStatus().getAppID(); + // Get the Job Handle id associated with the Spark job + sparkJobHandleId = jobRef.getJobId(); + + // Add Spark job handle id to the Hive History + addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId()); + + LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId); + + // Get the application id of the Spark app + jobID = jobRef.getSparkJobStatus().getAppID(); + + // Start monitoring the Spark job, returns when the Spark job has completed / failed, or if + // a timeout occurs rc = jobRef.monitorJob(); + + // Get the id the Spark job that was launched, returns -1 if no Spark job was launched + sparkJobID = jobRef.getSparkJobStatus().getJobId(); + + // Add Spark job id to the Hive History + addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID)); + + // Get the final state of the Spark job and parses its job info SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - sparkJobID = sparkJobStatus.getJobId(); getSparkJobInfo(sparkJobStatus, rc); + if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID)); logSparkStatistic(sparkStatistics); @@ -140,7 +166,7 @@ public int execute(DriverContext driverContext) { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. - LOG.info("Failed to submit Spark job " + sparkJobID); + LOG.info("Failed to submit Spark job for application id " + jobID); killJob(); } else if (rc == 4) { LOG.info("The spark job or one stage of it has too many tasks" + @@ -189,12 +215,35 @@ public int execute(DriverContext driverContext) { return rc; } - private void addToHistory(SparkJobRef jobRef) { - console.printInfo("Starting Spark Job = " + jobRef.getJobId()); + /** + * Use the Spark metrics and calculate how much task executione time was spent performing GC + * operations. If more than a defined threshold of time is spent, print out a warning on the + * console. + */ + private void printExcessiveGCWarning() { + SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( + SparkStatisticsNames.SPARK_GROUP_NAME); + if (sparkStatisticGroup != null) { + long taskDurationTime = Long.getLong(sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.TASK_DURATION_TIME).getValue()); + long jvmGCTime = Long.getLong(sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.JVM_GC_TIME).getValue()); + + // Threshold percentage to trigger the GC warning + double threshold = 0.1; + + if (jvmGCTime > taskDurationTime * threshold) { + long percentGcTime = Math.round((double) jvmGCTime / taskDurationTime * 100); + String gcWarning = String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of " + + "task time in GC", sparkJobID, percentGcTime, jvmGCTime, taskDurationTime); + console.printInfo(gcWarning); + } + } + } + + private void addToHistory(Keys key, String value) { if (SessionState.get() != null) { - SessionState.get().getHiveHistory() - .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID, - Integer.toString(jobRef.getSparkJobStatus().getJobId())); + SessionState.get().getHiveHistory().setQueryProperty(queryState.getQueryId(), key, value); } } @@ -327,6 +376,7 @@ public void shutdown() { } private void killJob() { + LOG.debug("Killing Spark job with job handle id " + sparkJobHandleId); boolean needToKillJob = false; if (jobRef != null && !jobKilled) { synchronized (this) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java index 5ab4d16777..e1006e383e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java @@ -17,17 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.spark.Statistic; -import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatisticGroup { private final String groupName; - private final List statisticList; + private final Map statistics = new LinkedHashMap<>(); SparkStatisticGroup(String groupName, List statisticList) { this.groupName = groupName; - this.statisticList = Collections.unmodifiableList(statisticList); + for (SparkStatistic sparkStatistic : statisticList) { + this.statistics.put(sparkStatistic.getName(), sparkStatistic); + } } public String getGroupName() { @@ -35,6 +38,13 @@ public String getGroupName() { } public Iterator getStatistics() { - return this.statisticList.iterator(); + return this.statistics.values().iterator(); + } + + /** + * Get a {@link SparkStatistic} by its given name + */ + public SparkStatistic getSparkStatistic(String name) { + return this.statistics.get(name); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java index 584e8bf8f6..946cadc0ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hive.ql.exec.spark.Statistic; - -import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatistics { - private final List statisticGroups; - SparkStatistics(List statisticGroups) { - this.statisticGroups = Collections.unmodifiableList(statisticGroups); + private final Map statisticGroups = new LinkedHashMap<>(); + + SparkStatistics(List statisticGroupsList) { + for (SparkStatisticGroup group : statisticGroupsList) { + statisticGroups.put(group.getGroupName(), group); + } } public Iterator getStatisticGroups() { - return this.statisticGroups.iterator(); + return this.statisticGroups.values().iterator(); + } + + public SparkStatisticGroup getStatisticGroup(String groupName) { + return this.statisticGroups.get(groupName); } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java index 6ebc274b28..d31d60a1f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java @@ -21,7 +21,7 @@ import org.apache.hive.spark.counter.SparkCounterGroup; import org.apache.hive.spark.counter.SparkCounters; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,15 +31,15 @@ private Map> statisticMap; public SparkStatisticsBuilder() { - statisticMap = new HashMap>(); + statisticMap = new LinkedHashMap<>(); } public SparkStatistics build() { List statisticGroups = new LinkedList(); for (Map.Entry> entry : statisticMap.entrySet()) { String groupName = entry.getKey(); - List statisitcList = entry.getValue(); - statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList)); + List statisticList = entry.getValue(); + statisticGroups.add(new SparkStatisticGroup(groupName, statisticList)); } return new SparkStatistics(statisticGroups); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java new file mode 100644 index 0000000000..ca93a80392 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +/** + * A collection of names that define different {@link SparkStatistic} objects. + */ +public class SparkStatisticsNames { + + public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + public static final String RESULT_SIZE = "ResultSize"; + public static final String JVM_GC_TIME = "JvmGCTime"; + public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + public static final String BYTES_READ = "BytesRead"; + public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + public static final String FETCH_WAIT_TIME = "FetchWaitTime"; + public static final String REMOTE_BYTES_READ = "RemoteBytesRead"; + public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + public static final String TASK_DURATION_TIME = "TaskDurationTime"; + + public static final String SPARK_GROUP_NAME = "SPARK"; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 3467ae4048..5cf7f4640d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -87,6 +87,7 @@ public int startMonitor() { } break; case STARTED: + console.printInfo("Starting Spark Job = " + sparkJobStatus.getJobId()); JobExecutionStatus sparkJobState = sparkJobStatus.getState(); if (sparkJobState == JobExecutionStatus.RUNNING) { Map progressMap = sparkJobStatus.getSparkStageProgress(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index eaeb4dc5a9..773fe9700e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.util.AbstractMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.TaskInfo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -38,7 +40,8 @@ private final Map jobIdToStageId = Maps.newHashMap(); private final Map stageIdToJobId = Maps.newHashMap(); - private final Map>> allJobMetrics = Maps.newHashMap(); + private final Map>>> allJobMetrics = + Maps.newHashMap(); @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { @@ -47,17 +50,12 @@ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (jobId == null) { LOG.warn("Can not find job id for stage[" + stageId + "]."); } else { - Map> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List stageMetrics = jobMetrics.get(stageId); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageId, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); + Map>> jobMetrics = allJobMetrics.computeIfAbsent( + jobId, k -> Maps.newHashMap()); + List> stageMetrics = jobMetrics.computeIfAbsent(stageId, + k -> Lists.newLinkedList()); + + stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo())); } } @@ -74,7 +72,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobIdToStageId.put(jobId, intStageIds); } - public synchronized Map> getJobMetric(int jobId) { + public synchronized Map>> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 8b031e7d26..03f8a0b680 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -17,28 +17,31 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; -import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hive.spark.client.MetricsCollection; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.counter.SparkCounters; + import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.TaskInfo; public class LocalSparkJobStatus implements SparkJobStatus { @@ -129,8 +132,7 @@ public SparkStatistics getSparkStatistics() { // add Hive operator level statistics. sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + Map>> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } @@ -138,16 +140,17 @@ public SparkStatistics getSparkStatistics() { MetricsCollection metricsCollection = new MetricsCollection(); Set stageIds = jobMetric.keySet(); for (int stageId : stageIds) { - List taskMetrics = jobMetric.get(stageId); - for (TaskMetrics taskMetric : taskMetrics) { - Metrics metrics = new Metrics(taskMetric); + List> taskMetrics = jobMetric.get(stageId); + for (Map.Entry taskMetric : taskMetrics) { + Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue()); metricsCollection.addMetrics(jobId, stageId, 0, metrics); } } Map flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection .getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(), + Long.toString(entry.getValue())); } return sparkStatisticsBuilder.build(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ec7ca40fb4..ff969e048f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -19,28 +19,30 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hive.spark.client.MetricsCollection; -import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.hive.spark.client.MetricsCollection; import org.apache.hive.spark.client.Job; import org.apache.hive.spark.client.JobContext; import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.counter.SparkCounters; + import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; -import java.net.InetAddress; -import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -125,16 +127,19 @@ public SparkStatistics getSparkStatistics() { if (metricsCollection == null || getCounter() == null) { return null; } + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); - // add Hive operator level statistics. + + // add Hive operator level statistics. - e.g. RECORDS_IN, RECORDS_OUT sparkStatisticsBuilder.add(getCounter()); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics"; + // add spark job metrics. - e.g. metrics collected by Spark itself (JvmGCTime, + // ExecutorRunTime, etc.) Map flatJobMetric = SparkMetricsUtils.collectMetrics( metricsCollection.getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(), + Long.toString(entry.getValue())); } return sparkStatisticsBuilder.build(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index dd171683d1..f72407e512 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -20,54 +20,40 @@ import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; final class SparkMetricsUtils { - private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; - private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; - private final static String RESULT_SIZE = "ResultSize"; - private final static String JVM_GC_TIME = "JvmGCTime"; - private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; - private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled"; - private final static String BYTES_READ = "BytesRead"; - private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - private final static String FETCH_WAIT_TIME = "FetchWaitTime"; - private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; - private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; - private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - private SparkMetricsUtils(){} static Map collectMetrics(Metrics allMetrics) { Map results = new LinkedHashMap(); - results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); - results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(RESULT_SIZE, allMetrics.resultSize); - results.put(JVM_GC_TIME, allMetrics.jvmGCTime); - results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); - results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); + results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { - results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); + results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead); } if (allMetrics.shuffleReadMetrics != null) { ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; long rbf = shuffleReadMetrics.remoteBlocksFetched; long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(REMOTE_BLOCKS_FETCHED, rbf); - results.put(LOCAL_BLOCKS_FETCHED, lbf); - results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf); + results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf); + results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); + results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); } if (allMetrics.shuffleWriteMetrics != null) { - results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); + results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } return results; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java index a5bafbca04..327628f8a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java @@ -69,6 +69,7 @@ TASK_NUM_MAPPERS, TASK_NUM_REDUCERS, ROWS_INSERTED, + SPARK_JOB_HANDLE_ID, SPARK_JOB_ID }; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 0f03a64063..526aefd8b7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -25,7 +25,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; @@ -148,6 +147,7 @@ private Metrics aggregate(Predicate filter) { long resultSerializationTime = 0L; long memoryBytesSpilled = 0L; long diskBytesSpilled = 0L; + long taskDurationTime = 0L; // Input metrics. boolean hasInputMetrics = false; @@ -173,6 +173,7 @@ private Metrics aggregate(Predicate filter) { resultSerializationTime += m.resultSerializationTime; memoryBytesSpilled += m.memoryBytesSpilled; diskBytesSpilled += m.diskBytesSpilled; + taskDurationTime += m.taskDurationTime; if (m.inputMetrics != null) { hasInputMetrics = true; @@ -222,6 +223,7 @@ private Metrics aggregate(Predicate filter) { resultSerializationTime, memoryBytesSpilled, diskBytesSpilled, + taskDurationTime, inputMetrics, shuffleReadMetrics, shuffleWriteMetrics); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e584cbb0a7..f221d0a878 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -478,7 +478,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { public void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (taskEnd.reason() instanceof org.apache.spark.Success$ && !taskEnd.taskInfo().speculative()) { - Metrics metrics = new Metrics(taskEnd.taskMetrics()); + Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo()); Integer jobId; synchronized (stageToJobId) { jobId = stageToJobId.get(taskEnd.stageId()); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 418d53483f..9da0116752 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -19,10 +19,11 @@ import java.io.Serializable; -import org.apache.spark.executor.TaskMetrics; - import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.TaskInfo; + /** * Metrics tracked during the execution of a job. * @@ -46,6 +47,8 @@ public final long memoryBytesSpilled; /** The number of on-disk bytes spilled by tasks. */ public final long diskBytesSpilled; + /** Amount of time spent executing tasks. */ + public final long taskDurationTime; /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */ public final InputMetrics inputMetrics; /** @@ -58,7 +61,7 @@ private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); } public Metrics( @@ -69,6 +72,7 @@ public Metrics( long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, + long taskDurationTime, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, ShuffleWriteMetrics shuffleWriteMetrics) { @@ -79,12 +83,13 @@ public Metrics( this.resultSerializationTime = resultSerializationTime; this.memoryBytesSpilled = memoryBytesSpilled; this.diskBytesSpilled = diskBytesSpilled; + this.taskDurationTime = taskDurationTime; this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; } - public Metrics(TaskMetrics metrics) { + public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), metrics.executorRunTime(), @@ -93,6 +98,7 @@ public Metrics(TaskMetrics metrics) { metrics.resultSerializationTime(), metrics.memoryBytesSpilled(), metrics.diskBytesSpilled(), + taskInfo.duration(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), optionalShuffleWriteMetrics(metrics)); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 8fef66b238..87b460da7d 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -66,7 +66,7 @@ public void testMetricsAggregation() { @Test public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); - Metrics metrics = new Metrics(value, value, value, value, value, value, value, + Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, null, null, null); MetricsCollection collection = new MetricsCollection(); @@ -94,10 +94,10 @@ public void testInputReadMethodAggregation() { MetricsCollection collection = new MetricsCollection(); long value = taskValue(1, 1, 1); - Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); - Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, + new InputMetrics(value), null, null); + Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, + new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -108,7 +108,7 @@ public void testInputReadMethodAggregation() { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; - return new Metrics(value, value, value, value, value, value, value, + return new Metrics(value, value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); @@ -154,6 +154,7 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.resultSerializationTime); assertEquals(expected, metrics.memoryBytesSpilled); assertEquals(expected, metrics.diskBytesSpilled); + assertEquals(expected, metrics.taskDurationTime); assertEquals(expected, metrics.inputMetrics.bytesRead);