diff --git itests/pom.xml itests/pom.xml
index a15e04a..53f6c98 100644
--- itests/pom.xml
+++ itests/pom.xml
@@ -49,6 +49,12 @@
hive-minikdc
+
+ hadoop-1
+
+ qtest-spark
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
index b33f0e2..ee16c9e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
@@ -220,7 +220,7 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr
// As we always use foreach action to submit RDD graph, it would only trigger on job.
int jobId = future.jobIds().get(0);
SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters);
+ new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
return new SparkJobRef(jobId, sparkJobStatus);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
index 31a45d0..55ca782 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.executor.InputMetrics;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -49,17 +50,20 @@
private JobStateListener jobStateListener;
private JobProgressListener jobProgressListener;
private SparkCounters sparkCounters;
+ private JavaFutureAction future;
public SimpleSparkJobStatus(
int jobId,
JobStateListener stateListener,
JobProgressListener progressListener,
- SparkCounters sparkCounters) {
+ SparkCounters sparkCounters,
+ JavaFutureAction future) {
this.jobId = jobId;
this.jobStateListener = stateListener;
this.jobProgressListener = progressListener;
this.sparkCounters = sparkCounters;
+ this.future = future;
}
@Override
@@ -69,7 +73,14 @@ public int getJobId() {
@Override
public SparkJobState getState() {
- return jobStateListener.getJobState(jobId);
+ // For spark job with empty source data, it's not submitted actually, so we would never
+ // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
+ // job state.
+ if (future.isDone()) {
+ return SparkJobState.SUCCEEDED;
+ } else {
+ return jobStateListener.getJobState(jobId);
+ }
}
@Override
@@ -135,6 +146,10 @@ public SparkStatistics getSparkStatistics() {
// add spark job metrics.
String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
Map> jobMetric = jobStateListener.getJobMetric(jobId);
+ if (jobMetric == null) {
+ return null;
+ }
+
Map flatJobMetric = combineJobLevelMetrics(jobMetric);
for (Map.Entry entry : flatJobMetric.entrySet()) {
sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
@@ -170,31 +185,35 @@ public void cleanup() {
boolean shuffleWriteMetricExist = false;
for (List stageMetric : jobMetric.values()) {
- for (TaskMetrics taskMetrics : stageMetric) {
- executorDeserializeTime += taskMetrics.executorDeserializeTime();
- executorRunTime += taskMetrics.executorRunTime();
- resultSize += taskMetrics.resultSize();
- jvmGCTime += taskMetrics.jvmGCTime();
- resultSerializationTime += taskMetrics.resultSerializationTime();
- memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
- diskBytesSpilled += taskMetrics.diskBytesSpilled();
- if (!taskMetrics.inputMetrics().isEmpty()) {
- inputMetricExist = true;
- bytesRead += taskMetrics.inputMetrics().get().bytesRead();
- }
- Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
- if (!shuffleReadMetricsOption.isEmpty()) {
- shuffleReadMetricExist = true;
- remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
- localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
- fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
- remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
- }
- Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
- if (!shuffleWriteMetricsOption.isEmpty()) {
- shuffleWriteMetricExist = true;
- shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
- shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+ Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+ Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+ }
}
}
}