diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index ee16c9e..f579711 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -52,7 +51,6 @@ import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ui.jobs.JobProgressListener; import scala.Tuple2; @@ -115,15 +113,11 @@ public static synchronized SparkClient getInstance(Configuration hiveConf) { private JobStateListener jobStateListener; - private JobProgressListener jobProgressListener; - private SparkClient(Configuration hiveConf) { SparkConf sparkConf = initiateSparkConf(hiveConf); sc = new JavaSparkContext(sparkConf); jobStateListener = new JobStateListener(); - jobProgressListener = new JobProgressListener(sparkConf); sc.sc().listenerBus().addListener(jobStateListener); - sc.sc().listenerBus().addListener(jobProgressListener); } private SparkConf initiateSparkConf(Configuration hiveConf) { @@ -217,10 +211,11 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future); + new SimpleSparkJobStatus(sc, jobId, jobStateListener, + sparkCounters, future); return new SparkJobRef(jobId, sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 5fd43bd..3b13d90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.spark.JobExecutionStatus; /** * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed. @@ -59,26 +60,18 @@ public int startMonitor() { boolean done = false; int failedCounter = 0; int rc = 0; - SparkJobState lastState = null; + JobExecutionStatus lastState = null; Map lastProgressMap = null; long startTime = 0; while (true) { - try { - Map progressMap = sparkJobStatus.getSparkStageProgress(); - SparkJobState state = sparkJobStatus.getState(); - - if (state != lastState || state == SparkJobState.RUNNING) { + JobExecutionStatus state = sparkJobStatus.getState(); + if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) { lastState = state; + Map progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { - case SUBMITTED: - console.printInfo("Status: Submitted"); - break; - case INITING: - console.printInfo("Status: Initializing"); - break; case RUNNING: if (!running) { // print job stages. @@ -110,14 +103,7 @@ public int startMonitor() { running = false; done = true; break; - case KILLED: - console.printInfo("Status: Killed"); - running = false; - done = true; - rc = 1; - break; case FAILED: - case ERROR: console.printError("Status: Failed"); running = false; done = true; @@ -187,17 +173,17 @@ private void printStatus(Map progressMap, Map 0) { /* tasks finished but some failed */ reportBuffer.append( - String.format( - "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost)); + String.format( + "%s: %d(-%d)/%d Finished with failed tasks\t", + stageName, complete, failed, total)); } else { if (complete == total) { reportBuffer.append( - String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost)); + String.format("%s: %d/%d Finished\t", stageName, complete, total)); } else { reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index bbc9fc3..b5c1837 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.spark.JobExecutionStatus; import java.util.Map; @@ -29,7 +30,7 @@ public int getJobId(); - public SparkJobState getState(); + public JobExecutionStatus getState(); public int[] getStageIds(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java index f1a2cfe..cfec354 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java @@ -23,23 +23,21 @@ private int succeededTaskCount; private int runningTaskCount; private int failedTaskCount; - private int killedTaskCount; - private long cumulativeTime; + // TODO: remove the following two metrics as they're not available in current spark API, + // we can add them back once spark provides it +// private int killedTaskCount; +// private long cumulativeTime; public SparkStageProgress( int totalTaskCount, int succeededTaskCount, int runningTaskCount, - int failedTaskCount, - int killedTaskCount, - long cumulativeTime) { + int failedTaskCount) { this.totalTaskCount = totalTaskCount; this.succeededTaskCount = succeededTaskCount; this.runningTaskCount = runningTaskCount; this.failedTaskCount = failedTaskCount; - this.killedTaskCount = killedTaskCount; - this.cumulativeTime = cumulativeTime; } public int getTotalTaskCount() { @@ -58,14 +56,6 @@ public int getFailedTaskCount() { return failedTaskCount; } - public int getKilledTaskCount() { - return killedTaskCount; - } - - public long getCumulativeTime() { - return cumulativeTime; - } - @Override public boolean equals(Object obj) { if (obj instanceof SparkStageProgress) { @@ -73,8 +63,7 @@ public boolean equals(Object obj) { return getTotalTaskCount() == other.getTotalTaskCount() && getSucceededTaskCount() == other.getSucceededTaskCount() && getRunningTaskCount() == other.getRunningTaskCount() - && getFailedTaskCount() == other.getFailedTaskCount() - && getKilledTaskCount() == other.getKilledTaskCount(); + && getFailedTaskCount() == other.getFailedTaskCount(); } return false; } @@ -90,10 +79,6 @@ public String toString() { sb.append(getRunningTaskCount()); sb.append(" Failed: "); sb.append(getFailedTaskCount()); - sb.append(" Killed: "); - sb.append(getKilledTaskCount()); - sb.append(" CumulativeTime: "); - sb.append(getCumulativeTime() + "ms"); return sb.toString(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index 55ca782..d46606d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -26,42 +25,35 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.executor.InputMetrics; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.StageInfo; -import org.apache.spark.ui.jobs.JobProgressListener; -import org.apache.spark.ui.jobs.UIData; import scala.Option; -import scala.Tuple2; - -import static scala.collection.JavaConversions.bufferAsJavaList; -import static scala.collection.JavaConversions.mutableMapAsJavaMap; public class SimpleSparkJobStatus implements SparkJobStatus { + private final JavaSparkContext sparkContext; private int jobId; + // After SPARK-2321, we only use JobStateListener to get job metrics + // TODO: remove it when the new API provides equivalent functionality private JobStateListener jobStateListener; - private JobProgressListener jobProgressListener; private SparkCounters sparkCounters; private JavaFutureAction future; - public SimpleSparkJobStatus( - int jobId, - JobStateListener stateListener, - JobProgressListener progressListener, - SparkCounters sparkCounters, - JavaFutureAction future) { - + public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId, + JobStateListener jobStateListener, SparkCounters sparkCounters, + JavaFutureAction future) { + this.sparkContext = sparkContext; this.jobId = jobId; - this.jobStateListener = stateListener; - this.jobProgressListener = progressListener; + this.jobStateListener = jobStateListener; this.sparkCounters = sparkCounters; this.future = future; } @@ -72,62 +64,39 @@ public int getJobId() { } @Override - public SparkJobState getState() { + public JobExecutionStatus getState() { // For spark job with empty source data, it's not submitted actually, so we would never // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current // job state. if (future.isDone()) { - return SparkJobState.SUCCEEDED; + return JobExecutionStatus.SUCCEEDED; } else { - return jobStateListener.getJobState(jobId); + // SparkJobInfo may not be available yet + SparkJobInfo sparkJobInfo = sparkContext.getJobInfo(jobId); + return sparkJobInfo == null ? null : sparkJobInfo.status(); } } @Override public int[] getStageIds() { - return jobStateListener.getStageIds(jobId); + SparkJobInfo sparkJobInfo = sparkContext.getJobInfo(jobId); + return sparkJobInfo == null ? null : sparkJobInfo.stageIds(); } @Override public Map getSparkStageProgress() { Map stageProgresses = new HashMap(); - int[] stageIds = jobStateListener.getStageIds(jobId); - if (stageIds != null) { - for (int stageId : stageIds) { - List stageInfos = getStageInfo(stageId); - for (StageInfo stageInfo : stageInfos) { - Tuple2 tuple2 = new Tuple2(stageInfo.stageId(), - stageInfo.attemptId()); - UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get(); - if (uiData != null) { - int runningTaskCount = uiData.numActiveTasks(); - int completedTaskCount = uiData.numCompleteTasks(); - int failedTaskCount = uiData.numFailedTasks(); - int totalTaskCount = stageInfo.numTasks(); - int killedTaskCount = 0; - long costTime; - Option startOption = stageInfo.submissionTime(); - Option completeOption = stageInfo.completionTime(); - if (startOption.isEmpty()) { - costTime = 0; - } else if (completeOption.isEmpty()) { - long startTime = (Long)startOption.get(); - costTime = System.currentTimeMillis() - startTime; - } else { - long startTime = (Long)startOption.get(); - long completeTime = (Long)completeOption.get(); - costTime = completeTime - startTime; - } - SparkStageProgress stageProgress = new SparkStageProgress( - totalTaskCount, - completedTaskCount, - runningTaskCount, - failedTaskCount, - killedTaskCount, - costTime); - stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress); - } - } + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = sparkContext.getStageInfo(stageId); + if (sparkStageInfo != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); } } return stageProgresses; @@ -144,8 +113,8 @@ public SparkStatistics getSparkStatistics() { // add Hive operator level statistics. sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobStateListener.getJobMetric(jobId); + String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics"; + Map> jobMetric = jobStateListener.getJobMetric(getJobId()); if (jobMetric == null) { return null; } @@ -160,7 +129,7 @@ public SparkStatistics getSparkStatistics() { @Override public void cleanup() { - jobStateListener.cleanup(jobId); + jobStateListener.cleanup(getJobId()); } private Map combineJobLevelMetrics(Map> jobMetric) { @@ -241,30 +210,4 @@ public void cleanup() { } return results; } - - private List getStageInfo(int stageId) { - List stageInfos = new LinkedList(); - - Map activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages()); - List completedStages = bufferAsJavaList(jobProgressListener.completedStages()); - List failedStages = bufferAsJavaList(jobProgressListener.failedStages()); - - if (activeStages.containsKey(stageId)) { - stageInfos.add(activeStages.get(stageId)); - } else { - for (StageInfo stageInfo : completedStages) { - if (stageInfo.stageId() == stageId) { - stageInfos.add(stageInfo); - } - } - - for (StageInfo stageInfo : failedStages) { - if (stageInfo.stageId() == stageId) { - stageInfos.add(stageInfo); - } - } - } - - return stageInfos; - } }