commit f1eb5f48fae7cba4a49be698ebd13d8b7422ee62 Author: Sahil Takiar Date: Tue Feb 27 20:21:55 2018 -0800 HIVE-18690: Integrate with Spark OutputMetrics diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c084fa054c..8d9659d4e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.Serializable; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,7 +30,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; + import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext; +import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -83,25 +85,12 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; - /** * File Sink operator implementation. **/ @@ -1244,6 +1233,10 @@ public void closeOp(boolean abort) throws HiveException { row_count.set(numRows); LOG.info(toString() + ": records written - " + numRows); + if ("spark".equalsIgnoreCase(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + SparkMetricUtils.updateSparkRecordsWrittenMetrics(runTimeNumRows); + } + if (!bDynParts && !filesCreated) { boolean skipFiles = "tez".equalsIgnoreCase( HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE)); @@ -1319,6 +1312,10 @@ public void closeOp(boolean abort) throws HiveException { if (conf.isGatherStats()) { publishStats(); } + + if ("spark".equals(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE))) { + SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths); + } } else { // Will come here if an Exception was thrown in map() or reduce(). // Hadoop always call close() even if an Exception was thrown in map() or diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java new file mode 100644 index 0000000000..77fadb6679 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java @@ -0,0 +1,39 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.TaskContext; + +import org.slf4j.Logger; + +import java.io.IOException; + + +public class SparkMetricUtils { + + public static void updateSparkRecordsWrittenMetrics(long numRows) { + TaskContext taskContext = TaskContext.get(); + if (taskContext != null && numRows > 0) { + taskContext.taskMetrics().outputMetrics().setRecordsWritten(numRows); + } + } + + public static void updateSparkBytesWrittenMetrics(Logger log, FileSystem fs, Path[] + commitPaths) { + TaskContext taskContext = TaskContext.get(); + if (taskContext != null) { + long bytesWritten = 0; + for (Path path : commitPaths) { + try { + bytesWritten += fs.getFileStatus(path).getLen(); + } catch (IOException e) { + log.debug("File " + path + " not yet visible; unable to collect file stats for this " + + "file, output metrics may be inaccurate", e); + } + } + if (bytesWritten > 0) { + taskContext.taskMetrics().outputMetrics().setBytesWritten(bytesWritten); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index 68e4f9e456..5a57b4021a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -41,5 +41,8 @@ public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; public static final String TASK_DURATION_TIME = "TaskDurationTime"; + public static final String RECORDS_WRITTEN = "RecordsWritten"; + public static final String BYTES_WRITTEN = "BytesWritten"; + public static final String SPARK_GROUP_NAME = "SPARK"; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index fab5422f1f..8fc4e7d1ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -58,6 +58,10 @@ private SparkMetricsUtils(){} results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } + if (allMetrics.outputMetrics != null) { + results.put(SparkStatisticsNames.BYTES_WRITTEN, allMetrics.outputMetrics.bytesWritten); + results.put(SparkStatisticsNames.RECORDS_WRITTEN, allMetrics.outputMetrics.recordsWritten); + } return results; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 2f3c026212..2ac0f00174 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.OutputMetrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics; @@ -166,6 +167,11 @@ private Metrics aggregate(Predicate filter) { long shuffleBytesWritten = 0L; long shuffleWriteTime = 0L; + // Input metrics. + boolean hasOuputMetrics = false; + long bytesWritten = 0L; + long recordsWritten = 0L; + for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; executorDeserializeTime += m.executorDeserializeTime; @@ -196,6 +202,12 @@ private Metrics aggregate(Predicate filter) { shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten; shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime; } + + if (m.outputMetrics != null) { + hasOuputMetrics = true; + bytesWritten += m.outputMetrics.bytesWritten; + recordsWritten += m.outputMetrics.recordsWritten; + } } InputMetrics inputMetrics = null; @@ -219,6 +231,11 @@ private Metrics aggregate(Predicate filter) { shuffleWriteTime); } + OutputMetrics outputMetrics = null; + if (hasInputMetrics) { + outputMetrics = new OutputMetrics(bytesWritten, recordsWritten); + } + return new Metrics( executorDeserializeTime, executorDeserializeCpuTime, @@ -232,7 +249,8 @@ private Metrics aggregate(Predicate filter) { taskDurationTime, inputMetrics, shuffleReadMetrics, - shuffleWriteMetrics); + shuffleWriteMetrics, + outputMetrics); } finally { lock.readLock().unlock(); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index b718b3bd95..e70d7a336e 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -63,10 +63,12 @@ public final ShuffleReadMetrics shuffleReadMetrics; /** If tasks wrote to shuffle output, metrics on the written shuffle data. */ public final ShuffleWriteMetrics shuffleWriteMetrics; + /** A collection of accumulators that represents metrics about writing data to external systems. */ + public final OutputMetrics outputMetrics; private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, null); } public Metrics( @@ -82,7 +84,8 @@ public Metrics( long taskDurationTime, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, - ShuffleWriteMetrics shuffleWriteMetrics) { + ShuffleWriteMetrics shuffleWriteMetrics, + OutputMetrics outputMetrics) { this.executorDeserializeTime = executorDeserializeTime; this.executorDeserializeCpuTime = executorDeserializeCpuTime; this.executorRunTime = executorRunTime; @@ -96,6 +99,7 @@ public Metrics( this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; + this.outputMetrics = outputMetrics; } public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { @@ -112,7 +116,8 @@ public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { taskInfo.duration(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), - optionalShuffleWriteMetrics(metrics)); + optionalShuffleWriteMetrics(metrics), + optionalOutputMetrics(metrics)); } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { @@ -127,4 +132,7 @@ private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metri return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } + private static OutputMetrics optionalOutputMetrics(TaskMetrics metrics) { + return (metrics.outputMetrics() != null) ? new OutputMetrics(metrics) : null; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java new file mode 100644 index 0000000000..4529c6b0c8 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java @@ -0,0 +1,25 @@ +package org.apache.hive.spark.client.metrics; + +import org.apache.spark.executor.TaskMetrics; + +import java.io.Serializable; + +public class OutputMetrics implements Serializable { + + public final long bytesWritten; + public final long recordsWritten; + + private OutputMetrics() { + // For Serialization only. + this(0L, 0L); + } + + public OutputMetrics(long bytesWritten, long recordsWritten) { + this.bytesWritten = bytesWritten; + this.recordsWritten = recordsWritten; + } + + public OutputMetrics(TaskMetrics metrics) { + this(metrics.outputMetrics().bytesWritten(), metrics.outputMetrics().recordsWritten()); + } +} diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index c5884cf06d..36ff6e2905 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -26,6 +26,7 @@ import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.OutputMetrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics; import org.junit.Test; @@ -67,7 +68,7 @@ public void testMetricsAggregation() { public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value, - value, null, null, null); + value, null, null, null, null); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -96,9 +97,9 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value), null, null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value), null, null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -112,7 +113,8 @@ private Metrics makeMetrics(int jobId, int stageId, long taskId) { return new Metrics(value, value, value, value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), - new ShuffleWriteMetrics(value, value)); + new ShuffleWriteMetrics(value, value), + new OutputMetrics(value, value)); } /** @@ -168,6 +170,9 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); + + assertEquals(expected, metrics.outputMetrics.recordsWritten); + assertEquals(expected, metrics.outputMetrics.bytesWritten); } }