diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index ee16c9e..3cca7f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -32,14 +32,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -52,7 +51,6 @@ import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ui.jobs.JobProgressListener; import scala.Tuple2; @@ -113,17 +111,13 @@ public static synchronized SparkClient getInstance(Configuration hiveConf) { private List localFiles = new ArrayList(); - private JobStateListener jobStateListener; - - private JobProgressListener jobProgressListener; + private JobMetricsListener jobMetricsListener; private SparkClient(Configuration hiveConf) { SparkConf sparkConf = initiateSparkConf(hiveConf); sc = new JavaSparkContext(sparkConf); - jobStateListener = new JobStateListener(); - jobProgressListener = new JobProgressListener(sparkConf); - sc.sc().listenerBus().addListener(jobStateListener); - sc.sc().listenerBus().addListener(jobProgressListener); + jobMetricsListener = new JobMetricsListener(); + sc.sc().listenerBus().addListener(jobMetricsListener); } private SparkConf initiateSparkConf(Configuration hiveConf) { @@ -217,10 +211,11 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); + new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, + sparkCounters, future); return new SparkJobRef(jobId, sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 5fd43bd..3b13d90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.spark.JobExecutionStatus; /** * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed. @@ -59,26 +60,18 @@ public int startMonitor() { boolean done = false; int failedCounter = 0; int rc = 0; - SparkJobState lastState = null; + JobExecutionStatus lastState = null; Map lastProgressMap = null; long startTime = 0; while (true) { - try { - Map progressMap = sparkJobStatus.getSparkStageProgress(); - SparkJobState state = sparkJobStatus.getState(); - - if (state != lastState || state == SparkJobState.RUNNING) { + JobExecutionStatus state = sparkJobStatus.getState(); + if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) { lastState = state; + Map progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { - case SUBMITTED: - console.printInfo("Status: Submitted"); - break; - case INITING: - console.printInfo("Status: Initializing"); - break; case RUNNING: if (!running) { // print job stages. @@ -110,14 +103,7 @@ public int startMonitor() { running = false; done = true; break; - case KILLED: - console.printInfo("Status: Killed"); - running = false; - done = true; - rc = 1; - break; case FAILED: - case ERROR: console.printError("Status: Failed"); running = false; done = true; @@ -187,17 +173,17 @@ private void printStatus(Map progressMap, Map 0) { /* tasks finished but some failed */ reportBuffer.append( - String.format( - "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost)); + String.format( + "%s: %d(-%d)/%d Finished with failed tasks\t", + stageName, complete, failed, total)); } else { if (complete == total) { reportBuffer.append( - String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost)); + String.format("%s: %d/%d Finished\t", stageName, complete, total)); } else { reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index bbc9fc3..b5c1837 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.spark.JobExecutionStatus; import java.util.Map; @@ -29,7 +30,7 @@ public int getJobId(); - public SparkJobState getState(); + public JobExecutionStatus getState(); public int[] getStageIds(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java index f1a2cfe..cfec354 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java @@ -23,23 +23,21 @@ private int succeededTaskCount; private int runningTaskCount; private int failedTaskCount; - private int killedTaskCount; - private long cumulativeTime; + // TODO: remove the following two metrics as they're not available in current spark API, + // we can add them back once spark provides it +// private int killedTaskCount; +// private long cumulativeTime; public SparkStageProgress( int totalTaskCount, int succeededTaskCount, int runningTaskCount, - int failedTaskCount, - int killedTaskCount, - long cumulativeTime) { + int failedTaskCount) { this.totalTaskCount = totalTaskCount; this.succeededTaskCount = succeededTaskCount; this.runningTaskCount = runningTaskCount; this.failedTaskCount = failedTaskCount; - this.killedTaskCount = killedTaskCount; - this.cumulativeTime = cumulativeTime; } public int getTotalTaskCount() { @@ -58,14 +56,6 @@ public int getFailedTaskCount() { return failedTaskCount; } - public int getKilledTaskCount() { - return killedTaskCount; - } - - public long getCumulativeTime() { - return cumulativeTime; - } - @Override public boolean equals(Object obj) { if (obj instanceof SparkStageProgress) { @@ -73,8 +63,7 @@ public boolean equals(Object obj) { return getTotalTaskCount() == other.getTotalTaskCount() && getSucceededTaskCount() == other.getSucceededTaskCount() && getRunningTaskCount() == other.getRunningTaskCount() - && getFailedTaskCount() == other.getFailedTaskCount() - && getKilledTaskCount() == other.getKilledTaskCount(); + && getFailedTaskCount() == other.getFailedTaskCount(); } return false; } @@ -90,10 +79,6 @@ public String toString() { sb.append(getRunningTaskCount()); sb.append(" Failed: "); sb.append(getFailedTaskCount()); - sb.append(" Killed: "); - sb.append(getKilledTaskCount()); - sb.append(" CumulativeTime: "); - sb.append(getCumulativeTime() + "ms"); return sb.toString(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java new file mode 100644 index 0000000..931ec7f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.JobSucceeded; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; + +public class JobMetricsListener implements SparkListener { + + private final static Log LOG = LogFactory.getLog(JobMetricsListener.class); + + private final Map jobIdToStageId = Maps.newHashMap(); + private final Map stageIdToJobId = Maps.newHashMap(); + private final Map>> allJobMetrics = Maps.newHashMap(); + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + + } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + + } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { + + } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + + } + + @Override + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { + int stageId = taskEnd.stageId(); + int stageAttemptId = taskEnd.stageAttemptId(); + String stageIdentifier = stageId + "_" + stageAttemptId; + Integer jobId = stageIdToJobId.get(stageId); + if (jobId == null) { + LOG.warn("Can not find job id for stage[" + stageId + "]."); + } else { + Map> jobMetrics = allJobMetrics.get(jobId); + if (jobMetrics == null) { + jobMetrics = Maps.newHashMap(); + allJobMetrics.put(jobId, jobMetrics); + } + List stageMetrics = jobMetrics.get(stageIdentifier); + if (stageMetrics == null) { + stageMetrics = Lists.newLinkedList(); + jobMetrics.put(stageIdentifier, stageMetrics); + } + stageMetrics.add(taskEnd.taskMetrics()); + } + } + + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + int jobId = jobStart.jobId(); + int size = jobStart.stageIds().size(); + int[] intStageIds = new int[size]; + for(int i=0; i< size; i++) { + Integer stageId = (Integer) jobStart.stageIds().apply(i); + intStageIds[i] = stageId; + stageIdToJobId.put(stageId, jobId); + } + jobIdToStageId.put(jobId, intStageIds); + } + + @Override + public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { + + } + + @Override + public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { + + } + + @Override + public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { + + } + + @Override + public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { + + } + + @Override + public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { + + } + + @Override + public void onApplicationStart(SparkListenerApplicationStart applicationStart) { + + } + + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + + } + + @Override + public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { + + } + + public synchronized Map> getJobMetric(int jobId) { + return allJobMetrics.get(jobId); + } + + public synchronized void cleanup(int jobId) { + allJobMetrics.remove(jobId); + jobIdToStageId.remove(jobId); + Iterator> iterator = stageIdToJobId.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue() == jobId) { + iterator.remove(); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java deleted file mode 100644 index e607095..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status.impl; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.JobSucceeded; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; - -public class JobStateListener implements SparkListener { - - private final static Log LOG = LogFactory.getLog(JobStateListener.class); - - private final Map jobIdToStates = Maps.newHashMap(); - private final Map jobIdToStageId = Maps.newHashMap(); - private final Map stageIdToJobId = Maps.newHashMap(); - private final Map>> allJobMetrics = Maps.newHashMap(); - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - - } - - @Override - public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { - int stageId = taskEnd.stageId(); - int stageAttemptId = taskEnd.stageAttemptId(); - String stageIdentifier = stageId + "_" + stageAttemptId; - Integer jobId = stageIdToJobId.get(stageId); - if (jobId == null) { - LOG.warn("Can not find job id for stage[" + stageId + "]."); - } else { - Map> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List stageMetrics = jobMetrics.get(stageIdentifier); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageIdentifier, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); - } - } - - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - int jobId = jobStart.jobId(); - jobIdToStates.put(jobId, SparkJobState.RUNNING); - int size = jobStart.stageIds().size(); - int[] intStageIds = new int[size]; - for(int i=0; i< size; i++) { - Integer stageId = (Integer) jobStart.stageIds().apply(i); - intStageIds[i] = stageId; - stageIdToJobId.put(stageId, jobId); - } - jobIdToStageId.put(jobId, intStageIds); - } - - @Override - public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { - // JobSucceeded is a scala singleton object, so we need to add a dollar at the second part. - if (jobEnd.jobResult().getClass().getName().equals(JobSucceeded.class.getName() + "$")) { - jobIdToStates.put(jobEnd.jobId(), SparkJobState.SUCCEEDED); - } else { - jobIdToStates.put(jobEnd.jobId(), SparkJobState.FAILED); - } - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - - } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - - } - - public synchronized SparkJobState getJobState(int jobId) { - return jobIdToStates.get(jobId); - } - - public synchronized int[] getStageIds(int jobId) { - return jobIdToStageId.get(jobId); - } - - public synchronized Map> getJobMetric(int jobId) { - return allJobMetrics.get(jobId); - } - - public synchronized void cleanup(int jobId) { - allJobMetrics.remove(jobId); - jobIdToStates.remove(jobId); - jobIdToStageId.remove(jobId); - Iterator> iterator = stageIdToJobId.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue() == jobId) { - iterator.remove(); - } - } - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index 55ca782..19fd20d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -26,42 +25,35 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.executor.InputMetrics; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.StageInfo; -import org.apache.spark.ui.jobs.JobProgressListener; -import org.apache.spark.ui.jobs.UIData; import scala.Option; -import scala.Tuple2; - -import static scala.collection.JavaConversions.bufferAsJavaList; -import static scala.collection.JavaConversions.mutableMapAsJavaMap; public class SimpleSparkJobStatus implements SparkJobStatus { + private final JavaSparkContext sparkContext; private int jobId; - private JobStateListener jobStateListener; - private JobProgressListener jobProgressListener; + // After SPARK-2321, we only use JobMetricsListener to get job metrics + // TODO: remove it when the new API provides equivalent functionality + private JobMetricsListener jobMetricsListener; private SparkCounters sparkCounters; private JavaFutureAction future; - public SimpleSparkJobStatus( - int jobId, - JobStateListener stateListener, - JobProgressListener progressListener, - SparkCounters sparkCounters, - JavaFutureAction future) { - + public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId, + JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, + JavaFutureAction future) { + this.sparkContext = sparkContext; this.jobId = jobId; - this.jobStateListener = stateListener; - this.jobProgressListener = progressListener; + this.jobMetricsListener = jobMetricsListener; this.sparkCounters = sparkCounters; this.future = future; } @@ -72,62 +64,39 @@ public int getJobId() { } @Override - public SparkJobState getState() { + public JobExecutionStatus getState() { // For spark job with empty source data, it's not submitted actually, so we would never // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current // job state. if (future.isDone()) { - return SparkJobState.SUCCEEDED; + return JobExecutionStatus.SUCCEEDED; } else { - return jobStateListener.getJobState(jobId); + // SparkJobInfo may not be available yet + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? null : sparkJobInfo.status(); } } @Override public int[] getStageIds() { - return jobStateListener.getStageIds(jobId); + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); } @Override public Map getSparkStageProgress() { Map stageProgresses = new HashMap(); - int[] stageIds = jobStateListener.getStageIds(jobId); - if (stageIds != null) { - for (int stageId : stageIds) { - List stageInfos = getStageInfo(stageId); - for (StageInfo stageInfo : stageInfos) { - Tuple2 tuple2 = new Tuple2(stageInfo.stageId(), - stageInfo.attemptId()); - UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get(); - if (uiData != null) { - int runningTaskCount = uiData.numActiveTasks(); - int completedTaskCount = uiData.numCompleteTasks(); - int failedTaskCount = uiData.numFailedTasks(); - int totalTaskCount = stageInfo.numTasks(); - int killedTaskCount = 0; - long costTime; - Option startOption = stageInfo.submissionTime(); - Option completeOption = stageInfo.completionTime(); - if (startOption.isEmpty()) { - costTime = 0; - } else if (completeOption.isEmpty()) { - long startTime = (Long)startOption.get(); - costTime = System.currentTimeMillis() - startTime; - } else { - long startTime = (Long)startOption.get(); - long completeTime = (Long)completeOption.get(); - costTime = completeTime - startTime; - } - SparkStageProgress stageProgress = new SparkStageProgress( - totalTaskCount, - completedTaskCount, - runningTaskCount, - failedTaskCount, - killedTaskCount, - costTime); - stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress); - } - } + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = getStageInfo(stageId); + if (sparkStageInfo != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); } } return stageProgresses; @@ -145,7 +114,7 @@ public SparkStatistics getSparkStatistics() { sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobStateListener.getJobMetric(jobId); + Map> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } @@ -160,7 +129,7 @@ public SparkStatistics getSparkStatistics() { @Override public void cleanup() { - jobStateListener.cleanup(jobId); + jobMetricsListener.cleanup(jobId); } private Map combineJobLevelMetrics(Map> jobMetric) { @@ -242,29 +211,11 @@ public void cleanup() { return results; } - private List getStageInfo(int stageId) { - List stageInfos = new LinkedList(); - - Map activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages()); - List completedStages = bufferAsJavaList(jobProgressListener.completedStages()); - List failedStages = bufferAsJavaList(jobProgressListener.failedStages()); - - if (activeStages.containsKey(stageId)) { - stageInfos.add(activeStages.get(stageId)); - } else { - for (StageInfo stageInfo : completedStages) { - if (stageInfo.stageId() == stageId) { - stageInfos.add(stageInfo); - } - } - - for (StageInfo stageInfo : failedStages) { - if (stageInfo.stageId() == stageId) { - stageInfos.add(stageInfo); - } - } - } + private SparkJobInfo getJobInfo() { + return sparkContext.statusTracker().getJobInfo(jobId); + } - return stageInfos; + private SparkStageInfo getStageInfo(int stageId) { + return sparkContext.statusTracker().getStageInfo(stageId); } }