diff --git itests/pom.xml itests/pom.xml index a15e04a..53f6c98 100644 --- itests/pom.xml +++ itests/pom.xml @@ -49,6 +49,12 @@ hive-minikdc + + hadoop-1 + + qtest-spark + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index b33f0e2..ee16c9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -220,7 +220,7 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr // As we always use foreach action to submit RDD graph, it would only trigger on job. int jobId = future.jobIds().get(0); SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters); + new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); return new SparkJobRef(jobId, sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index 31a45d0..55ca782 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.executor.InputMetrics; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.ShuffleWriteMetrics; @@ -49,17 +50,20 @@ private JobStateListener jobStateListener; private JobProgressListener jobProgressListener; private SparkCounters sparkCounters; + private JavaFutureAction future; public SimpleSparkJobStatus( int jobId, JobStateListener stateListener, JobProgressListener progressListener, - SparkCounters sparkCounters) { + SparkCounters sparkCounters, + JavaFutureAction future) { this.jobId = jobId; this.jobStateListener = stateListener; this.jobProgressListener = progressListener; this.sparkCounters = sparkCounters; + this.future = future; } @Override @@ -69,7 +73,14 @@ public int getJobId() { @Override public SparkJobState getState() { - return jobStateListener.getJobState(jobId); + // For spark job with empty source data, it's not submitted actually, so we would never + // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current + // job state. + if (future.isDone()) { + return SparkJobState.SUCCEEDED; + } else { + return jobStateListener.getJobState(jobId); + } } @Override @@ -135,6 +146,10 @@ public SparkStatistics getSparkStatistics() { // add spark job metrics. String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; Map> jobMetric = jobStateListener.getJobMetric(jobId); + if (jobMetric == null) { + return null; + } + Map flatJobMetric = combineJobLevelMetrics(jobMetric); for (Map.Entry entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); @@ -170,31 +185,35 @@ public void cleanup() { boolean shuffleWriteMetricExist = false; for (List stageMetric : jobMetric.values()) { - for (TaskMetrics taskMetrics : stageMetric) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } } } }