diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index f5d9c4c..c50cf43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -143,7 +143,7 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr int jobId = future.jobIds().get(0); LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus( sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), future); - return new LocalSparkJobRef(Integer.toString(jobId), hiveConf, sparkJobStatus, sc); + return new LocalSparkJobRef(Integer.toString(jobId), hiveConf, sparkJobStatus, sc, sparkWork.getSparkTask()); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a705dfc..8520919 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -207,7 +207,7 @@ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sp JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes); JobHandle jobHandle = remoteClient.submit(job); RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout); - return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); + return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus, sparkWork.getSparkTask()); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 87e96a9..0eeb489 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -74,6 +74,14 @@ private static final long serialVersionUID = 1L; private transient String sparkJobID; private transient SparkStatistics sparkStatistics; + private transient long submitTime; + private transient long startTime; + private transient long finishTime; + private transient int succeededTaskCount; + private transient int totalTaskCount; + private transient int runningTaskCount; + private transient int failedTaskCount; + private transient List stageIds; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -94,8 +102,10 @@ public int execute(DriverContext driverContext) { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); + sparkWork.setSparkTask(this); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + submitTime = System.currentTimeMillis(); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); @@ -130,6 +140,7 @@ public int execute(DriverContext driverContext) { LOG.error(msg, e); rc = 1; } finally { + finishTime = System.currentTimeMillis(); Utilities.clearWork(conf); if (sparkSession != null && sparkSessionManager != null) { rc = close(rc); @@ -239,6 +250,62 @@ public SparkStatistics getSparkStatistics() { return sparkStatistics; } + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public void setSucceededTaskCount(int complete) { + succeededTaskCount = complete; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public void setTotalTaskCount(int total) { + totalTaskCount = total; + } + + public int getRunningTaskCount() { + return runningTaskCount; + } + + public void setRunningTaskCount(int running) { + runningTaskCount = running; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + public void setFailedTaskCount(int failed) { + failedTaskCount = failed; + } + + public List getStageIds() { + return stageIds; + } + + public void setStageIds(List ids) { + stageIds = ids; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long ts) { + startTime = ts; + } + + public long getSubmitTime() { + return submitTime; + } + + public long getFinishTime() { + return finishTime; + } + /** * Set the number of reducers for the spark work. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index b6d128b..b8073b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.spark.JobExecutionStatus; @@ -33,7 +36,11 @@ private SparkJobStatus sparkJobStatus; public LocalSparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) { - super(hiveConf); + this(hiveConf, sparkJobStatus, null); + } + + public LocalSparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus, SparkTask sparkTask) { + super(hiveConf, sparkTask); this.sparkJobStatus = sparkJobStatus; } @@ -77,8 +84,14 @@ public int startMonitor() { // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages:"); + List stageIds = new ArrayList(); for (int stageId : sparkJobStatus.getStageIds()) { console.printInfo(Integer.toString(stageId)); + stageIds.add(stageId); + } + if (task != null) { + task.setStartTime(System.currentTimeMillis()); + task.setStageIds(stageIds); } console.printInfo("\nStatus: Running (Hive on Spark job[" diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 77038fc..9b2c84c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.exec.spark.status; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hive.spark.client.JobHandle; @@ -38,7 +41,11 @@ private final HiveConf hiveConf; public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { - super(hiveConf); + this(hiveConf, sparkJobStatus, null); + } + + public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus, SparkTask sparkTask) { + super(hiveConf, sparkTask); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; } @@ -90,6 +97,17 @@ public int startMonitor() { console.printInfo("\nStatus: Running (Hive on Spark job[" + sparkJobStatus.getJobId() + "])"); + if (task != null) { + List stageIds = new ArrayList(); + int[] ids = sparkJobStatus.getStageIds(); + if (ids != null) { + for (int stageId : ids) { + stageIds.add(stageId); + } + } + task.setStartTime(System.currentTimeMillis()); + task.setStageIds(stageIds); + } running = true; String format = "Job Progress Format\nCurrentTime StageId_StageAttemptId: " diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index d5b9b5d..6bf3ea0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.InPlaceUpdates; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; import org.fusesource.jansi.Ansi; @@ -78,13 +79,19 @@ private static final int progressBarChars = 30; private final NumberFormat secondsFormat = new DecimalFormat("#0.00"); + protected SparkTask task; protected SparkJobMonitor(HiveConf hiveConf) { + this(hiveConf, null); + } + + protected SparkJobMonitor(HiveConf hiveConf, SparkTask sparkTask) { monitorTimeoutInterval = hiveConf.getTimeVar( HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf); console = SessionState.getConsole(); out = SessionState.LogHelper.getInfoStream(); + task = sparkTask; } public abstract int startMonitor(); @@ -180,12 +187,20 @@ private String getReport(Map progressMap) { reportBuffer.append(currentDate + "\t"); SortedSet keys = new TreeSet(progressMap.keySet()); + int sumTotal = 0; + int sumComplete = 0; + int sumRunning = 0; + int sumFailed = 0; for (String s : keys) { SparkStageProgress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); + sumTotal += total; + sumComplete += complete; + sumRunning += running; + sumFailed += failed; String stageName = "Stage-" + s; if (total <= 0) { reportBuffer.append(String.format("%s: -/-\t", stageName)); @@ -230,6 +245,12 @@ private String getReport(Map progressMap) { } } } + if (task != null) { + task.setSucceededTaskCount(sumComplete); + task.setTotalTaskCount(sumTotal); + task.setRunningTaskCount(sumRunning); + task.setFailedTaskCount(sumFailed); + } return reportBuffer.toString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java index ce4d932..0a23a04 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; @@ -29,17 +30,28 @@ private final HiveConf hiveConf; private final LocalSparkJobStatus sparkJobStatus; private final JavaSparkContext javaSparkContext; + private SparkTask task; public LocalSparkJobRef( String jobId, HiveConf hiveConf, LocalSparkJobStatus sparkJobStatus, JavaSparkContext javaSparkContext) { + this(jobId, hiveConf, sparkJobStatus, javaSparkContext, null); + } + + public LocalSparkJobRef( + String jobId, + HiveConf hiveConf, + LocalSparkJobStatus sparkJobStatus, + JavaSparkContext javaSparkContext, + SparkTask sparkTask) { this.jobId = jobId; this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; this.javaSparkContext = javaSparkContext; + this.task = sparkTask; } @Override @@ -61,7 +73,7 @@ public boolean cancelJob() { @Override public int monitorJob() { - LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus); + LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus, task); return localSparkJobMonitor.startMonitor(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java index 4c0993c..b32009f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; @@ -31,12 +32,19 @@ private final HiveConf hiveConf; private final RemoteSparkJobStatus sparkJobStatus; private final JobHandle jobHandler; + private SparkTask task; public RemoteSparkJobRef(HiveConf hiveConf, JobHandle jobHandler, RemoteSparkJobStatus sparkJobStatus) { + this(hiveConf, jobHandler, sparkJobStatus, null); + } + + public RemoteSparkJobRef(HiveConf hiveConf, JobHandle jobHandler, + RemoteSparkJobStatus sparkJobStatus, SparkTask sparkTask) { this.jobHandler = jobHandler; this.jobId = jobHandler.getClientJobId(); this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; + this.task = sparkTask; } @Override @@ -56,7 +64,7 @@ public boolean cancelJob() { @Override public int monitorJob() { - RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus); + RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus, task); return remoteSparkJobMonitor.startMonitor(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index bb5dd79..e417218 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import com.google.common.base.Preconditions; @@ -62,6 +63,8 @@ private Map cloneToWork; + private transient SparkTask task; + public SparkWork(String name) { this.name = name + ":" + (++counter); cloneToWork = new HashMap(); @@ -280,6 +283,14 @@ public void connect(BaseWork a, BaseWork b, SparkEdgeProperty edgeProp) { edgeProperties.put(workPair, edgeProp); } + public SparkTask getSparkTask() { + return task; + } + + public void setSparkTask(SparkTask sparkTask) { + this.task = sparkTask; + } + /* * Dependency is a class used for explain */