commit 6f03eb8aa4c72a076af2ea4e9e1aa3b5ae62a6ef Author: Sahil Takiar Date: Thu Feb 8 09:33:41 2018 -0800 HIVE-18034: Improving logging with HoS executors spend lots of time in GC diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index eaeb4dc5a9..d6a252c675 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.util.AbstractMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.spark.scheduler.TaskInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +40,8 @@ private final Map jobIdToStageId = Maps.newHashMap(); private final Map stageIdToJobId = Maps.newHashMap(); - private final Map>> allJobMetrics = Maps.newHashMap(); + private final Map>>> allJobMetrics = + Maps.newHashMap(); @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { @@ -47,17 +50,12 @@ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (jobId == null) { LOG.warn("Can not find job id for stage[" + stageId + "]."); } else { - Map> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List stageMetrics = jobMetrics.get(stageId); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageId, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); + Map>> jobMetrics = allJobMetrics.computeIfAbsent( + jobId, k -> Maps.newHashMap()); + List> stageMetrics = jobMetrics.computeIfAbsent(stageId, + k -> Lists.newLinkedList()); + + stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo())); } } @@ -74,7 +72,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobIdToStageId.put(jobId, intStageIds); } - public synchronized Map> getJobMetric(int jobId) { + public synchronized Map>> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 3e84175b69..388d2b0c63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; +import org.apache.spark.scheduler.TaskInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; @@ -128,7 +129,7 @@ public SparkStatistics getSparkStatistics() { sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + Map>> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } @@ -136,9 +137,9 @@ public SparkStatistics getSparkStatistics() { MetricsCollection metricsCollection = new MetricsCollection(); Set stageIds = jobMetric.keySet(); for (int stageId : stageIds) { - List taskMetrics = jobMetric.get(stageId); - for (TaskMetrics taskMetric : taskMetrics) { - Metrics metrics = new Metrics(taskMetric); + List> taskMetrics = jobMetric.get(stageId); + for (Map.Entry taskMetric : taskMetrics) { + Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue()); metricsCollection.addMetrics(jobId, stageId, 0, metrics); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ce07a9fadd..88560689ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.metrics.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -122,6 +124,9 @@ public SparkStatistics getSparkStatistics() { if (metricsCollection == null || getCounter() == null) { return null; } + + printExcessiveGCWarning(metricsCollection.getAllMetrics()); + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); @@ -197,6 +202,18 @@ private SparkStageInfo getSparkStageInfo(int stageId) { return jobHandle.getState(); } + private void printExcessiveGCWarning(Metrics allMetrics) { + long taskDurationTime = allMetrics.taskDurationTime; + long jvmGCTime = allMetrics.jvmGCTime; + double threshold = 0.1; + + if (jvmGCTime > taskDurationTime * threshold) { + SessionState.getConsole().printInfo("WARNING: Spark Job[" + jobHandle.getClientJobId() + "]" + + " Spent " + Math.round(jvmGCTime / taskDurationTime * 100) + "% of " + + "task time in GC"); + } + } + private static class GetJobInfoJob implements Job { private final String clientJobId; private final int sparkJobId; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index dd171683d1..3f2f5443db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -40,6 +40,7 @@ private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + private final static String TASK_DURATION_TIME = "TaskDurationTime"; private SparkMetricsUtils(){} @@ -52,6 +53,7 @@ private SparkMetricsUtils(){} results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 0f03a64063..50859c5b20 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -25,7 +25,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; @@ -148,6 +147,7 @@ private Metrics aggregate(Predicate filter) { long resultSerializationTime = 0L; long memoryBytesSpilled = 0L; long diskBytesSpilled = 0L; + long duration = 0L; // Input metrics. boolean hasInputMetrics = false; @@ -173,6 +173,7 @@ private Metrics aggregate(Predicate filter) { resultSerializationTime += m.resultSerializationTime; memoryBytesSpilled += m.memoryBytesSpilled; diskBytesSpilled += m.diskBytesSpilled; + duration += m.taskDurationTime; if (m.inputMetrics != null) { hasInputMetrics = true; @@ -224,7 +225,8 @@ private Metrics aggregate(Predicate filter) { diskBytesSpilled, inputMetrics, shuffleReadMetrics, - shuffleWriteMetrics); + shuffleWriteMetrics, + duration); } finally { lock.readLock().unlock(); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 66cf33942c..45dc429af5 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -491,7 +491,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { public void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (taskEnd.reason() instanceof org.apache.spark.Success$ && !taskEnd.taskInfo().speculative()) { - Metrics metrics = new Metrics(taskEnd.taskMetrics()); + Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo()); Integer jobId; synchronized (stageToJobId) { jobId = stageToJobId.get(taskEnd.stageId()); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 418d53483f..fc9c24a153 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.spark.scheduler.TaskInfo; /** * Metrics tracked during the execution of a job. @@ -46,6 +47,8 @@ public final long memoryBytesSpilled; /** The number of on-disk bytes spilled by tasks. */ public final long diskBytesSpilled; + /** Amount of time spent executing tasks. */ + public final long taskDurationTime; /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */ public final InputMetrics inputMetrics; /** @@ -58,7 +61,7 @@ private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, 0L); } public Metrics( @@ -71,7 +74,8 @@ public Metrics( long diskBytesSpilled, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, - ShuffleWriteMetrics shuffleWriteMetrics) { + ShuffleWriteMetrics shuffleWriteMetrics, + long taskDurationTime) { this.executorDeserializeTime = executorDeserializeTime; this.executorRunTime = executorRunTime; this.resultSize = resultSize; @@ -82,9 +86,10 @@ public Metrics( this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; + this.taskDurationTime = taskDurationTime; } - public Metrics(TaskMetrics metrics) { + public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), metrics.executorRunTime(), @@ -95,7 +100,8 @@ public Metrics(TaskMetrics metrics) { metrics.diskBytesSpilled(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), - optionalShuffleWriteMetrics(metrics)); + optionalShuffleWriteMetrics(metrics), + taskInfo.duration()); } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 8fef66b238..535e0cd2de 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -67,7 +67,7 @@ public void testMetricsAggregation() { public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); Metrics metrics = new Metrics(value, value, value, value, value, value, value, - null, null, null); + null, null, null, value); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -95,9 +95,9 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + new InputMetrics(value), null, null, value); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + new InputMetrics(value), null, null, value); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -111,7 +111,8 @@ private Metrics makeMetrics(int jobId, int stageId, long taskId) { return new Metrics(value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), - new ShuffleWriteMetrics(value, value)); + new ShuffleWriteMetrics(value, value), + value); } /** @@ -164,6 +165,8 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); + + assertEquals(expected, metrics.taskDurationTime); } }