diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index de541ec..faa91e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 507e6c6..f46c1b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -136,7 +136,7 @@ public Serializable call(JobContext jc) throws Exception { JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - jc.monitor(future); + jc.monitor(future, sparkCounters); return null; } }); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java index 02a5329..fdf43f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.Reporter; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 6956bb9..30b7632 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; @@ -103,9 +103,10 @@ public int execute(DriverContext driverContext) { SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (sparkJobStatus != null) { - sparkCounters = sparkJobStatus.getCounter(); SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); monitor.startMonitor(); + // for RSC, we should get the counters after job has finished + sparkCounters = sparkJobStatus.getCounter(); SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java index b4f2038..bf074a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.spark.Statistic; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounter; +import org.apache.hive.spark.counter.SparkCounterGroup; +import org.apache.hive.spark.counter.SparkCounters; import java.util.HashMap; import java.util.LinkedList; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java deleted file mode 100644 index cdf2e10..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; - -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; -import org.apache.spark.api.java.JavaSparkContext; - -public class SparkCounter implements Serializable { - - private String name; - private String displayName; - private Accumulator accumulator; - - public SparkCounter( - String name, - String displayName, - String groupName, - long initValue, - JavaSparkContext sparkContext) { - - this.name = name; - this.displayName = displayName; - LongAccumulatorParam longParam = new LongAccumulatorParam(); - String accumulatorName = groupName + "_" + name; - this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam); - } - - public long getValue() { - return accumulator.value(); - } - - public void increment(long incr) { - accumulator.add(incr); - } - - public String getName() { - return name; - } - - public String getDisplayName() { - return displayName; - } - - public void setDisplayName(String displayName) { - this.displayName = displayName; - } - - class LongAccumulatorParam implements AccumulatorParam { - - @Override - public Long addAccumulator(Long t1, Long t2) { - return t1 + t2; - } - - @Override - public Long addInPlace(Long r1, Long r2) { - return r1 + r2; - } - - @Override - public Long zero(Long initialValue) { - return 0L; - } - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java deleted file mode 100644 index e4c912d..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.spark.api.java.JavaSparkContext; - -/** - * We use group to fold all the same kind of counters. - */ -public class SparkCounterGroup implements Serializable { - private static final long serialVersionUID = 1L; - private String groupName; - private String groupDisplayName; - private Map sparkCounters; - - private transient JavaSparkContext javaSparkContext; - - public SparkCounterGroup( - String groupName, - String groupDisplayName, - JavaSparkContext javaSparkContext) { - - this.groupName = groupName; - this.groupDisplayName = groupDisplayName; - this.javaSparkContext = javaSparkContext; - sparkCounters = new HashMap(); - } - - public void createCounter(String name, long initValue) { - String displayName = ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); - SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext); - sparkCounters.put(name, counter); - } - - public SparkCounter getCounter(String name) { - return sparkCounters.get(name); - } - - public String getGroupName() { - return groupName; - } - - public String getGroupDisplayName() { - return groupDisplayName; - } - - public void setGroupDisplayName(String groupDisplayName) { - this.groupDisplayName = groupDisplayName; - } - - public Map getSparkCounters() { - return sparkCounters; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java deleted file mode 100644 index 1753e78..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ScriptOperator; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.spark.api.java.JavaSparkContext; - -/** - * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few - * limitation of Spark accumulator, like: - * 1. accumulator should be created at Spark context side. - * 2. Spark tasks can only increment metric count. - * 3. User can only get accumulator value at Spark context side. - * These Spark Counter API is designed to fit into Hive requirement, while with several access - * restriction due to Spark accumulator previous mentioned: - * 1. Counter should be created on driver side if it would be accessed in task. - * 2. increment could only be invoked task side. - * 3. Hive could only get Counter value at driver side. - */ -public class SparkCounters implements Serializable { - private static final long serialVersionUID = 1L; - - private static final Log LOG = LogFactory.getLog(SparkCounters.class); - - private Map sparkCounterGroups; - - private transient JavaSparkContext javaSparkContext; - private transient Configuration hiveConf; - - public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) { - this.javaSparkContext = javaSparkContext; - this.hiveConf = hiveConf; - sparkCounterGroups = new HashMap(); - } - - public void createCounter(Enum key) { - createCounter(key.getDeclaringClass().getName(), key.name()); - } - - public void createCounter(String groupName, Enum key) { - createCounter(groupName, key.name(), 0L); - } - - public void createCounter(String groupName, String counterName) { - createCounter(groupName, counterName, 0L); - } - - public void createCounter(String groupName, String counterName, long initValue) { - getGroup(groupName).createCounter(counterName, initValue); - } - - public void increment(Enum key, long incrValue) { - increment(key.getDeclaringClass().getName(), key.name(), incrValue); - } - - public void increment(String groupName, String counterName, long value) { - SparkCounter counter = getGroup(groupName).getCounter(counterName); - if (counter == null) { - LOG.error( - String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); - } else { - counter.increment(value); - } - } - - public long getValue(String groupName, String counterName) { - SparkCounter counter = getGroup(groupName).getCounter(counterName); - if (counter == null) { - LOG.error( - String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); - return 0; - } else { - return counter.getValue(); - } - } - - public SparkCounter getCounter(String groupName, String counterName) { - return getGroup(groupName).getCounter(counterName); - } - - public SparkCounter getCounter(Enum key) { - return getCounter(key.getDeclaringClass().getName(), key.name()); - } - - private SparkCounterGroup getGroup(String groupName) { - SparkCounterGroup group = sparkCounterGroups.get(groupName); - if (group == null) { - String groupDisplayName = - ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); - group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext); - sparkCounterGroups.put(groupName, group); - } - return group; - } - - public Map getSparkCounterGroups() { - return sparkCounterGroups; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - Map groups = getSparkCounterGroups(); - if (groups != null) { - for(Map.Entry groupEntry : groups.entrySet()) { - String groupName = groupEntry.getKey(); - SparkCounterGroup group = groupEntry.getValue(); - sb.append(groupName).append("\n"); - Map counters = group.getSparkCounters(); - for (Map.Entry counterEntry : counters.entrySet()) { - String counterName = counterEntry.getKey(); - SparkCounter counter = counterEntry.getValue(); - sb.append("\t") - .append(counterName) - .append(": ") - .append(counter.getValue()) - .append("\n"); - } - } - } - - return sb.toString(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index b5c1837..91ecefa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; import java.util.Map; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 09c79e4..b8cc6e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -24,7 +24,7 @@ import com.google.common.collect.Maps; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.spark.JobExecutionStatus; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 1c64482..055b2cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -21,7 +21,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hive.spark.client.MetricsCollection; +import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hive.spark.client.Job; @@ -37,6 +41,7 @@ import java.io.Serializable; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -97,12 +102,27 @@ public JobExecutionStatus getState() { @Override public SparkCounters getCounter() { - return null; + return jobHandle.getSparkCounters(); } @Override public SparkStatistics getSparkStatistics() { - return null; + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. + sparkStatisticsBuilder.add(getCounter()); + // add spark job metrics. + String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; + MetricsCollection metricsCollection = jobHandle.getMetrics(); + if (metricsCollection == null) { + return null; + } + + Map flatJobMetric = extractMetrics(metricsCollection); + for (Map.Entry entry : flatJobMetric.entrySet()) { + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + } + + return sparkStatisticsBuilder.build(); } @Override @@ -194,7 +214,7 @@ public JobExecutionStatus status() { } } } - if(jobInfo == null) { + if (jobInfo == null) { jobInfo = new SparkJobInfo() { @Override public int jobId() { @@ -216,11 +236,11 @@ public JobExecutionStatus status() { } } - private static class GetStageInfoJob implements Job{ + private static class GetStageInfoJob implements Job { private final int stageId; - GetStageInfoJob(int stageId){ - this.stageId=stageId; + GetStageInfoJob(int stageId) { + this.stageId = stageId; } @Override @@ -229,4 +249,36 @@ public HiveSparkStageInfo call(JobContext jc) throws Exception { return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo(); } } + + private Map extractMetrics(MetricsCollection metricsCollection) { + Map results = new LinkedHashMap(); + Metrics allMetrics = metricsCollection.getAllMetrics(); + + results.put("EexcutorDeserializeTime", allMetrics.executorDeserializeTime); + results.put("ExecutorRunTime", allMetrics.executorRunTime); + results.put("ResultSize", allMetrics.resultSize); + results.put("JvmGCTime", allMetrics.jvmGCTime); + results.put("ResultSerializationTime", allMetrics.resultSerializationTime); + results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled); + results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled); + if (allMetrics.inputMetrics != null) { + results.put("BytesRead", allMetrics.inputMetrics.bytesRead); + } + if (allMetrics.shuffleReadMetrics != null) { + ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; + long rbf = shuffleReadMetrics.remoteBlocksFetched; + long lbf = shuffleReadMetrics.localBlocksFetched; + results.put("RemoteBlocksFetched", rbf); + results.put("LocalBlocksFetched", lbf); + results.put("TotalBlocksFetched", lbf + rbf); + results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime); + results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead); + } + if (allMetrics.shuffleWriteMetrics != null) { + results.put("ShuffleBytesWritten", allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put("ShuffleWriteTime", allMetrics.shuffleWriteMetrics.shuffleWriteTime); + } + + return results; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java index 0847ce7..03355a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; public class CounterStatsAggregatorSpark implements StatsAggregator, StatsCollectionTaskIndependent { diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 0fabba4..8565bd8 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -17,6 +17,7 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; @@ -43,7 +44,7 @@ * * @return The job (unmodified). */ - JavaFutureAction monitor(JavaFutureAction job); + JavaFutureAction monitor(JavaFutureAction job, SparkCounters sparkCounters); /** * Return a map from client job Id to corresponding JavaFutureActions diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index e58191d..137539b 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -17,6 +17,7 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; @@ -43,8 +44,8 @@ public JavaSparkContext sc() { } @Override - public JavaFutureAction monitor(JavaFutureAction job) { - monitorCb.get().call(job); + public JavaFutureAction monitor(JavaFutureAction job, SparkCounters sparkCounters) { + monitorCb.get().call(job, sparkCounters); return job; } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index 0a1ae7d..fd5daf4 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -22,6 +22,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hive.spark.counter.SparkCounters; /** * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. @@ -48,6 +49,11 @@ */ List getSparkJobIds(); + /** + * Get the SparkCounters for this job + */ + SparkCounters getSparkCounters(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 5f27e7e..806e1ea 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -17,6 +17,8 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.counter.SparkCounters; + import java.io.Serializable; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -41,6 +43,7 @@ private Throwable error; private final List sparkJobIds; + private SparkCounters sparkCounters; JobHandleImpl(SparkClientImpl client, String jobId) { this.client = client; @@ -50,6 +53,7 @@ this.cancelled = new AtomicBoolean(); this.completed = false; this.sparkJobIds = new CopyOnWriteArrayList(); + sparkCounters = null; } /** Requests a running job to be cancelled. */ @@ -113,6 +117,11 @@ public MetricsCollection getMetrics() { return sparkJobIds; } + @Override + public SparkCounters getSparkCounters() { + return sparkCounters; + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { @@ -150,4 +159,7 @@ void complete(Object result, Throwable error) { } } + public void setSparkCounters(SparkCounters sparkCounters) { + this.sparkCounters = sparkCounters; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 52428f0..6619c88 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -17,7 +17,6 @@ package org.apache.hive.spark.client; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; diff --git spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java index 0ef4296..3693f8f 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java +++ spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java @@ -17,10 +17,11 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.api.java.JavaFutureAction; interface MonitorCallback { - void call(JavaFutureAction future); + void call(JavaFutureAction future, SparkCounters sparkCounters); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index ed94861..68295f3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -20,6 +20,7 @@ import java.io.Serializable; import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.counter.SparkCounters; final class Protocol { @@ -112,15 +113,17 @@ final String id; final T result; final Throwable error; + final SparkCounters sparkCounters; - JobResult(String id, T result, Throwable error) { + JobResult(String id, T result, Throwable error, SparkCounters sparkCounters) { this.id = id; this.result = result; this.error = error; + this.sparkCounters = sparkCounters; } JobResult() { - this(null, null, null); + this(null, null, null, null); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 6dbe45a..cbe06d8 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hive.spark.counter.SparkCounters; import scala.Tuple2; import akka.actor.ActorRef; @@ -196,6 +197,7 @@ public void onReceive(Object message) throws Exception { private final Protocol.JobRequest req; private final List> jobs; private final AtomicInteger completed; + private SparkCounters sparkCounters; private Future future; @@ -203,6 +205,7 @@ public void onReceive(Object message) throws Exception { this.req = req; this.jobs = Lists.newArrayList(); this.completed = new AtomicInteger(); + this.sparkCounters = null; } @Override @@ -210,8 +213,8 @@ public Void call() throws Exception { try { jc.setMonitorCb(new MonitorCallback() { @Override - public void call(JavaFutureAction future) { - monitorJob(future); + public void call(JavaFutureAction future, SparkCounters sparkCounters) { + monitorJob(future, sparkCounters); } }); @@ -223,13 +226,16 @@ public void call(JavaFutureAction future) { completed.wait(); } } - client.tell(new Protocol.JobResult(req.id, result, null), actor); + if (sparkCounters != null) { + sparkCounters.dumpAllCounters(); + } + client.tell(new Protocol.JobResult(req.id, result, null, sparkCounters), actor); } catch (Throwable t) { - // Catch throwables in a best-effort to report job status back to the client. It's - // re-thrown so that the executor can destroy the affected thread (or the JVM can - // die or whatever would happen if the throwable bubbled up). - client.tell(new Protocol.JobResult(req.id, null, t), actor); - throw new ExecutionException(t); + // Catch throwables in a best-effort to report job status back to the client. It's + // re-thrown so that the executor can destroy the affected thread (or the JVM can + // die or whatever would happen if the throwable bubbled up). + client.tell(new Protocol.JobResult(req.id, null, t, null), actor); + throw new ExecutionException(t); } finally { jc.setMonitorCb(null); activeJobs.remove(req.id); @@ -248,12 +254,13 @@ void jobDone() { } } - private void monitorJob(JavaFutureAction job) { + private void monitorJob(JavaFutureAction job, SparkCounters sparkCounters) { jobs.add(job); if (!jc.getMonitoredJobs().containsKey(req.id)) { jc.getMonitoredJobs().put(req.id, new CopyOnWriteArrayList>()); } jc.getMonitoredJobs().get(req.id).add(job); + this.sparkCounters = sparkCounters; client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 0aea4a2..4a1ffee 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -323,6 +323,7 @@ public void onReceive(Object message) throws Exception { JobHandleImpl handle = jobs.remove(jr.id); if (handle != null) { LOG.info("Received result for {}", jr.id); + handle.setSparkCounters(jr.sparkCounters); handle.complete(jr.result, jr.error); } else { LOG.warn("Received result for unknown job {}", jr.id); diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java new file mode 100644 index 0000000..21de693 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkCounter implements Serializable { + + private String name; + private String displayName; + private Accumulator accumulator; + // Values of accumulators can only be read on the SparkContext side + // In case of RSC, we have to keep the data here + private long accumValue = -1; + + public SparkCounter( + String name, + String displayName, + String groupName, + long initValue, + JavaSparkContext sparkContext) { + + this.name = name; + this.displayName = displayName; + LongAccumulatorParam longParam = new LongAccumulatorParam(); + String accumulatorName = groupName + "_" + name; + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam); + } + + public long getValue() { + try { + return accumulator.value(); + } catch (UnsupportedOperationException e) { + return accumValue; + } + } + + public void increment(long incr) { + accumulator.add(incr); + } + + public String getName() { + return name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + public void dumpValue() { + accumValue = accumulator.value(); + } + + class LongAccumulatorParam implements AccumulatorParam { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + +} diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java new file mode 100644 index 0000000..ea1b7ee --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * We use group to fold all the same kind of counters. + */ +public class SparkCounterGroup implements Serializable { + private static final long serialVersionUID = 1L; + private String groupName; + private String groupDisplayName; + private Map sparkCounters; + + private transient JavaSparkContext javaSparkContext; + + public SparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + + this.groupName = groupName; + this.groupDisplayName = groupDisplayName; + this.javaSparkContext = javaSparkContext; + sparkCounters = new HashMap(); + } + + public void createCounter(String name, long initValue) { + String displayName = ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); + SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext); + sparkCounters.put(name, counter); + } + + public SparkCounter getCounter(String name) { + return sparkCounters.get(name); + } + + public String getGroupName() { + return groupName; + } + + public String getGroupDisplayName() { + return groupDisplayName; + } + + public void setGroupDisplayName(String groupDisplayName) { + this.groupDisplayName = groupDisplayName; + } + + public Map getSparkCounters() { + return sparkCounters; + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java new file mode 100644 index 0000000..95b8f0b --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few + * limitation of Spark accumulator, like: + * 1. accumulator should be created at Spark context side. + * 2. Spark tasks can only increment metric count. + * 3. User can only get accumulator value at Spark context side. + * These Spark Counter API is designed to fit into Hive requirement, while with several access + * restriction due to Spark accumulator previous mentioned: + * 1. Counter should be created on driver side if it would be accessed in task. + * 2. increment could only be invoked task side. + * 3. Hive could only get Counter value at driver side. + */ +public class SparkCounters implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SparkCounters.class); + + private Map sparkCounterGroups; + + private transient JavaSparkContext javaSparkContext; + private transient Configuration hiveConf; + + public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) { + this.javaSparkContext = javaSparkContext; + this.hiveConf = hiveConf; + sparkCounterGroups = new HashMap(); + } + + public void createCounter(Enum key) { + createCounter(key.getDeclaringClass().getName(), key.name()); + } + + public void createCounter(String groupName, Enum key) { + createCounter(groupName, key.name(), 0L); + } + + public void createCounter(String groupName, String counterName) { + createCounter(groupName, counterName, 0L); + } + + public void createCounter(String groupName, String counterName, long initValue) { + getGroup(groupName).createCounter(counterName, initValue); + } + + public void increment(Enum key, long incrValue) { + increment(key.getDeclaringClass().getName(), key.name(), incrValue); + } + + public void increment(String groupName, String counterName, long value) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } else { + counter.increment(value); + } + } + + public long getValue(String groupName, String counterName) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + return 0; + } else { + return counter.getValue(); + } + } + + public SparkCounter getCounter(String groupName, String counterName) { + return getGroup(groupName).getCounter(counterName); + } + + public SparkCounter getCounter(Enum key) { + return getCounter(key.getDeclaringClass().getName(), key.name()); + } + + private SparkCounterGroup getGroup(String groupName) { + SparkCounterGroup group = sparkCounterGroups.get(groupName); + if (group == null) { + String groupDisplayName = + ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); + group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext); + sparkCounterGroups.put(groupName, group); + } + return group; + } + + public Map getSparkCounterGroups() { + return sparkCounterGroups; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + Map groups = getSparkCounterGroups(); + if (groups != null) { + for(Map.Entry groupEntry : groups.entrySet()) { + String groupName = groupEntry.getKey(); + SparkCounterGroup group = groupEntry.getValue(); + sb.append(groupName).append("\n"); + Map counters = group.getSparkCounters(); + for (Map.Entry counterEntry : counters.entrySet()) { + String counterName = counterEntry.getKey(); + SparkCounter counter = counterEntry.getValue(); + sb.append("\t") + .append(counterName) + .append(": ") + .append(counter.getValue()) + .append("\n"); + } + } + } + + return sb.toString(); + } + + /** + * Dump all SparkCounter values. + * RSC should call this method before sending back the counters to client + */ + public void dumpAllCounters() { + for (SparkCounterGroup counterGroup : sparkCounterGroups.values()) { + for (SparkCounter counter : counterGroup.getSparkCounters().values()) { + counter.dumpValue(); + } + } + } +} diff --git spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 378303b..407e0bd 100644 --- spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -254,7 +254,7 @@ public Integer call(JobContext jc) throws Exception { public void call(Integer l) throws Exception { } - })); + }), null); future.get(TIMEOUT, TimeUnit.SECONDS);