diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 87e96a9..f836065 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -74,6 +76,13 @@ private static final long serialVersionUID = 1L; private transient String sparkJobID; private transient SparkStatistics sparkStatistics; + private transient long submitTime; + private transient long startTime; + private transient long finishTime; + private transient int succeededTaskCount; + private transient int totalTaskCount; + private transient int failedTaskCount; + private transient List stageIds; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -96,6 +105,7 @@ public int execute(DriverContext driverContext) { sparkWork.setRequiredCounterPrefix(getOperatorCounters()); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); @@ -104,6 +114,7 @@ public int execute(DriverContext driverContext) { this.jobID = jobRef.getSparkJobStatus().getAppID(); rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); + getSparkJobInfo(sparkJobStatus); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { @@ -130,6 +141,8 @@ public int execute(DriverContext driverContext) { LOG.error(msg, e); rc = 1; } finally { + startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING); + finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB); Utilities.clearWork(conf); if (sparkSession != null && sparkSessionManager != null) { rc = close(rc); @@ -239,6 +252,34 @@ public SparkStatistics getSparkStatistics() { return sparkStatistics; } + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + public List getStageIds() { + return stageIds; + } + + public long getStartTime() { + return startTime; + } + + public long getSubmitTime() { + return submitTime; + } + + public long getFinishTime() { + return finishTime; + } + /** * Set the number of reducers for the spark work. */ @@ -288,4 +329,34 @@ private void printConfigInfo() throws IOException { return counters; } + + private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { + try { + stageIds = new ArrayList(); + int[] ids = sparkJobStatus.getStageIds(); + if (ids != null) { + for (int stageId : ids) { + stageIds.add(stageId); + } + } + Map progressMap = sparkJobStatus.getSparkStageProgress(); + int sumTotal = 0; + int sumComplete = 0; + int sumFailed = 0; + for (String s : progressMap.keySet()) { + SparkStageProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int failed = progress.getFailedTaskCount(); + sumTotal += total; + sumComplete += complete; + sumFailed += failed; + } + succeededTaskCount = sumComplete; + totalTaskCount = sumTotal; + failedTaskCount = sumFailed; + } catch (Exception e) { + LOG.error("Failed to get Spark job information", e); + } + } }