commit 6edef361ced67651faeb2f6d58ebf03c654f1e6c Author: Sahil Takiar Date: Thu Feb 15 22:16:40 2018 -0800 HIVE-18651: Expose additional Spark metrics diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java new file mode 100644 index 0000000000..be3b501110 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TestSparkStatistics { + + @Test + public void testSparkStatistics() { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local-cluster[1,2,1024]"); + + SessionState.start(conf); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + Assert.assertEquals(0, driver.compile("select * from test order by col")); + + List sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks()); + Assert.assertEquals(1, sparkTasks.size()); + + SparkTask sparkTask = sparkTasks.get(0); + + DriverContext driverCxt = new DriverContext(driver.getContext()); + driverCxt.prepare(driver.getPlan()); + + sparkTask.initialize(driver.getQueryState(), driver.getPlan(), driverCxt, driver.getContext() + .getOpContext()); + Assert.assertEquals(0, sparkTask.execute(driverCxt)); + + Assert.assertNotNull(sparkTask.getSparkStatistics()); + + List sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics() + .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics()); + + Assert.assertEquals(18, sparkStats.size()); + + Map statsMap = sparkStats.stream().collect( + Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue)); + + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.TASK_DURATION_TIME)) > 0); + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_CPU_TIME)) > 0); + Assert.assertTrue( + Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME)) > 0); + Assert.assertTrue( + Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME)) > 0); + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_RUN_TIME)) > 0); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index c2408844f5..3083e30ee3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; @@ -158,8 +159,7 @@ public int execute(DriverContext driverContext) { sparkStatistics = sparkJobStatus.getSparkStatistics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID)); - logSparkStatistic(sparkStatistics); + LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); } LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " + jobID + " and task ID " + getId()); @@ -250,17 +250,25 @@ private void addToHistory(Keys key, String value) { } } - private void logSparkStatistic(SparkStatistics sparkStatistic) { + @VisibleForTesting + static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJobID) { + StringBuilder sparkStatsString = new StringBuilder(); + sparkStatsString.append("\n\n"); + sparkStatsString.append(String.format("=====Spark Job[%d] Statistics=====", sparkJobID)); + sparkStatsString.append("\n\n"); + Iterator groupIterator = sparkStatistic.getStatisticGroups(); while (groupIterator.hasNext()) { SparkStatisticGroup group = groupIterator.next(); - LOG.info(group.getGroupName()); + sparkStatsString.append(group.getGroupName()).append("\n"); Iterator statisticIterator = group.getStatistics(); while (statisticIterator.hasNext()) { SparkStatistic statistic = statisticIterator.next(); - LOG.info("\t" + statistic.getName() + ": " + statistic.getValue()); + sparkStatsString.append("\t").append(statistic.getName()).append(": ").append( + statistic.getValue()).append("\n"); } } + return sparkStatsString.toString(); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index ca93a80392..68e4f9e456 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -23,11 +23,13 @@ public class SparkStatisticsNames { public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + public static final String EXECUTOR_DESERIALIZE_CPU_TIME = "ExecutorDeserializeCpuTime"; public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + public static final String EXECUTOR_CPU_TIME = "ExecutorCpuTime"; public static final String RESULT_SIZE = "ResultSize"; public static final String JVM_GC_TIME = "JvmGCTime"; public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled"; public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; public static final String BYTES_READ = "BytesRead"; public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index f72407e512..fab5422f1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -31,11 +31,14 @@ private SparkMetricsUtils(){} static Map collectMetrics(Metrics allMetrics) { Map results = new LinkedHashMap(); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME, + allMetrics.executorDeserializeCpuTime); results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 435c6b606b..75b4151118 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -96,6 +97,19 @@ public void testRemoteSparkCancel() { Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } + @Test + public void testSparkStatisticsToString() { + SparkStatisticsBuilder statsBuilder = new SparkStatisticsBuilder(); + statsBuilder.add("TEST", "stat1", "1"); + statsBuilder.add("TEST", "stat2", "1"); + String statsString = SparkTask.sparkStatisticsToString(statsBuilder.build(), 10); + + Assert.assertTrue(statsString.contains("10")); + Assert.assertTrue(statsString.contains("TEST")); + Assert.assertTrue(statsString.contains("stat1")); + Assert.assertTrue(statsString.contains("stat2")); + Assert.assertTrue(statsString.contains("1")); + } private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 526aefd8b7..2f3c026212 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -141,7 +141,9 @@ private Metrics aggregate(Predicate filter) { try { // Task metrics. long executorDeserializeTime = 0L; + long executorDeserializeCpuTime = 0L; long executorRunTime = 0L; + long executorCpuTime = 0L; long resultSize = 0L; long jvmGCTime = 0L; long resultSerializationTime = 0L; @@ -167,7 +169,9 @@ private Metrics aggregate(Predicate filter) { for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; executorDeserializeTime += m.executorDeserializeTime; + executorDeserializeCpuTime += m.executorDeserializeCpuTime; executorRunTime += m.executorRunTime; + executorCpuTime += m.executorCpuTime; resultSize += m.resultSize; jvmGCTime += m.jvmGCTime; resultSerializationTime += m.resultSerializationTime; @@ -217,7 +221,9 @@ private Metrics aggregate(Predicate filter) { return new Metrics( executorDeserializeTime, + executorDeserializeCpuTime, executorRunTime, + executorCpuTime, resultSize, jvmGCTime, resultSerializationTime, diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 9da0116752..b718b3bd95 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client.metrics; import java.io.Serializable; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -35,8 +36,12 @@ /** Time taken on the executor to deserialize tasks. */ public final long executorDeserializeTime; + /** CPU time taken on the executor to deserialize tasks. */ + public final long executorDeserializeCpuTime; /** Time the executor spends actually running the task (including fetching shuffle data). */ public final long executorRunTime; + /** CPU time the executor spends running the task (including fetching shuffle data). */ + public final long executorCpuTime; /** The number of bytes sent back to the driver by tasks. */ public final long resultSize; /** Amount of time the JVM spent in garbage collection while executing tasks. */ @@ -61,12 +66,14 @@ private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); } public Metrics( long executorDeserializeTime, + long executorDeserializeCpuTime, long executorRunTime, + long executorCpuTime, long resultSize, long jvmGCTime, long resultSerializationTime, @@ -77,7 +84,9 @@ public Metrics( ShuffleReadMetrics shuffleReadMetrics, ShuffleWriteMetrics shuffleWriteMetrics) { this.executorDeserializeTime = executorDeserializeTime; + this.executorDeserializeCpuTime = executorDeserializeCpuTime; this.executorRunTime = executorRunTime; + this.executorCpuTime = executorCpuTime; this.resultSize = resultSize; this.jvmGCTime = jvmGCTime; this.resultSerializationTime = resultSerializationTime; @@ -92,7 +101,9 @@ public Metrics( public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), + TimeUnit.NANOSECONDS.toMillis(metrics.executorDeserializeCpuTime()), metrics.executorRunTime(), + TimeUnit.NANOSECONDS.toMillis(metrics.executorCpuTime()), metrics.resultSize(), metrics.jvmGCTime(), metrics.resultSerializationTime(), diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 87b460da7d..c5884cf06d 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -66,8 +66,8 @@ public void testMetricsAggregation() { @Test public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); - Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, - null, null, null); + Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value, + value, null, null, null); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -94,10 +94,11 @@ public void testInputReadMethodAggregation() { MetricsCollection collection = new MetricsCollection(); long value = taskValue(1, 1, 1); - Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, - new InputMetrics(value), null, null); - Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + + Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, + value, new InputMetrics(value), null, null); + Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, + value, new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -108,7 +109,7 @@ public void testInputReadMethodAggregation() { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; - return new Metrics(value, value, value, value, value, value, value, value, + return new Metrics(value, value, value, value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); @@ -148,7 +149,9 @@ private long globalValue(int jobCount, int stagesPerJob, int tasksPerStage) { private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.executorDeserializeTime); + assertEquals(expected, metrics.executorDeserializeCpuTime); assertEquals(expected, metrics.executorRunTime); + assertEquals(expected, metrics.executorCpuTime); assertEquals(expected, metrics.resultSize); assertEquals(expected, metrics.jvmGCTime); assertEquals(expected, metrics.resultSerializationTime);