diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index dfe0efe..6217de4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -107,15 +107,15 @@ public SparkCounters getCounter() { @Override public SparkStatistics getSparkStatistics() { + MetricsCollection metricsCollection = jobHandle.getMetrics(); + if (metricsCollection == null || getCounter() == null) { + return null; + } SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - MetricsCollection metricsCollection = jobHandle.getMetrics(); - if (metricsCollection == null) { - return null; - } Map flatJobMetric = extractMetrics(metricsCollection); for (Map.Entry entry : flatJobMetric.entrySet()) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index d6e0f9d..6aeb6b7 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -40,7 +40,7 @@ private final MetricsCollection metrics; private final Promise promise; private final List sparkJobIds; - private SparkCounters sparkCounters; + private volatile SparkCounters sparkCounters; JobHandleImpl(SparkClientImpl client, Promise promise, String jobId) { this.cancelled = new AtomicBoolean(); diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 0d49731..7d02f41 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -17,18 +17,6 @@ package org.apache.hive.spark.client; -import java.io.Serializable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -36,19 +24,44 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.rpc.Rpc; +import org.apache.hive.spark.client.rpc.RpcConfiguration; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; -import org.apache.spark.scheduler.*; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hive.spark.client.metrics.Metrics; -import org.apache.hive.spark.client.rpc.Rpc; -import org.apache.hive.spark.client.rpc.RpcConfiguration; -import org.apache.hive.spark.counter.SparkCounters; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; /** * Driver code for the Spark client library. @@ -289,7 +302,8 @@ public void call(JavaFutureAction future, SparkCounters sparkCounters) { // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). LOG.info("Failed to run job " + req.id, t); - protocol.jobFinished(req.id, null, t, null); + protocol.jobFinished(req.id, null, t, + sparkCounters != null ? sparkCounters.snapshot() : null); throw new ExecutionException(t); } finally { jc.setMonitorCb(null);