diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index b48de3e..4a5e8cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -38,26 +38,24 @@ private final Map jobIdToStageId = Maps.newHashMap(); private final Map stageIdToJobId = Maps.newHashMap(); - private final Map>> allJobMetrics = Maps.newHashMap(); + private final Map>> allJobMetrics = Maps.newHashMap(); @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { int stageId = taskEnd.stageId(); - int stageAttemptId = taskEnd.stageAttemptId(); - String stageIdentifier = stageId + "_" + stageAttemptId; Integer jobId = stageIdToJobId.get(stageId); if (jobId == null) { LOG.warn("Can not find job id for stage[" + stageId + "]."); } else { - Map> jobMetrics = allJobMetrics.get(jobId); + Map> jobMetrics = allJobMetrics.get(jobId); if (jobMetrics == null) { jobMetrics = Maps.newHashMap(); allJobMetrics.put(jobId, jobMetrics); } - List stageMetrics = jobMetrics.get(stageIdentifier); + List stageMetrics = jobMetrics.get(stageId); if (stageMetrics == null) { stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageIdentifier, stageMetrics); + jobMetrics.put(stageId, stageMetrics); } stageMetrics.add(taskEnd.taskMetrics()); } @@ -76,7 +74,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobIdToStageId.put(jobId, intStageIds); } - public synchronized Map> getJobMetric(int jobId) { + public synchronized Map> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index d4819d9..4e93979 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -36,14 +36,8 @@ import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; -import scala.Option; - -import com.google.common.collect.Maps; - public class LocalSparkJobStatus implements SparkJobStatus { private final JavaSparkContext sparkContext; @@ -132,22 +126,21 @@ public SparkStatistics getSparkStatistics() { sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + Map> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } MetricsCollection metricsCollection = new MetricsCollection(); - Set stageIds = jobMetric.keySet(); - for (String stageId : stageIds) { + Set stageIds = jobMetric.keySet(); + for (int stageId : stageIds) { List taskMetrics = jobMetric.get(stageId); for (TaskMetrics taskMetric : taskMetrics) { Metrics metrics = new Metrics(taskMetric); - metricsCollection.addMetrics(jobId, Integer.parseInt(stageId), 0, metrics); + metricsCollection.addMetrics(jobId, stageId, 0, metrics); } } - SparkJobUtils sparkJobUtils = new SparkJobUtils(); - Map flatJobMetric = sparkJobUtils.collectMetrics(metricsCollection + Map flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection .getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 2c6818f..9fc717f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.spark.client.MetricsCollection; -import org.apache.hive.spark.client.metrics.Metrics; -import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; @@ -40,7 +38,6 @@ import java.io.Serializable; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -125,8 +122,8 @@ public SparkStatistics getSparkStatistics() { // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - SparkJobUtils sparkJobUtils = new SparkJobUtils(); - Map flatJobMetric = sparkJobUtils.collectMetrics(metricsCollection.getAllMetrics()); + Map flatJobMetric = SparkMetricsUtils.collectMetrics( + metricsCollection.getAllMetrics()); for (Map.Entry entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java deleted file mode 100644 index eff208a..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status.impl; - -import java.util.LinkedHashMap; -import java.util.Map; - -import org.apache.hive.spark.client.metrics.Metrics; -import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; - -public class SparkJobUtils { - - private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; - private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; - private final static String RESULT_SIZE = "ResultSize"; - private final static String JVM_GC_TIME = "JvmGCTime"; - private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; - private final static String DISK_BYTES_SPLIED = "DiskBytesSpilled"; - private final static String BYTES_READ = "BytesRead"; - private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - private final static String FETCH_WAIT_TIME = "FetchWaitTime"; - private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; - private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; - private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - - public Map collectMetrics(Metrics allMetrics) { - Map results = new LinkedHashMap(); - results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); - results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(RESULT_SIZE, allMetrics.resultSize); - results.put(JVM_GC_TIME, allMetrics.jvmGCTime); - results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); - results.put(DISK_BYTES_SPLIED, allMetrics.diskBytesSpilled); - if (allMetrics.inputMetrics != null) { - results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); - } - if (allMetrics.shuffleReadMetrics != null) { - ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; - long rbf = shuffleReadMetrics.remoteBlocksFetched; - long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(REMOTE_BLOCKS_FETCHED, rbf); - results.put(LOCAL_BLOCKS_FETCHED, lbf); - results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); - } - if (allMetrics.shuffleWriteMetrics != null) { - results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); - } - return results; - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java new file mode 100644 index 0000000..16ce073 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; + +final class SparkMetricsUtils { + + private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + private final static String RESULT_SIZE = "ResultSize"; + private final static String JVM_GC_TIME = "JvmGCTime"; + private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + private final static String BYTES_READ = "BytesRead"; + private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + private final static String FETCH_WAIT_TIME = "FetchWaitTime"; + private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; + private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + + private SparkMetricsUtils(){} + + static Map collectMetrics(Metrics allMetrics) { + Map results = new LinkedHashMap(); + results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(RESULT_SIZE, allMetrics.resultSize); + results.put(JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); + results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + if (allMetrics.inputMetrics != null) { + results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); + } + if (allMetrics.shuffleReadMetrics != null) { + ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; + long rbf = shuffleReadMetrics.remoteBlocksFetched; + long lbf = shuffleReadMetrics.localBlocksFetched; + results.put(REMOTE_BLOCKS_FETCHED, rbf); + results.put(LOCAL_BLOCKS_FETCHED, lbf); + results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); + results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + } + if (allMetrics.shuffleWriteMetrics != null) { + results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); + } + return results; + } + +}