diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index b092abc..00939a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -25,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** @@ -39,7 +40,7 @@ private transient LogHelper console; private final int checkInterval = 200; - private final int maxRetryInterval = 2500; + private final int maxRetryInterval = 2500; private final int printInterval = 3000; private long lastPrintTime; private Set completed; @@ -59,19 +60,19 @@ public int startMonitor() { int failedCounter = 0; int rc = 0; SparkJobState lastState = null; - String lastReport = null; + Map lastProgressMap = null; long startTime = 0; - while(true) { + while (true) { try { - Map progressMap = sparkJobStatus.getSparkStageProgress(); + Map progressMap = sparkJobStatus.getSparkStageProgress(); SparkJobState state = sparkJobStatus.getState(); if (state != lastState || state == SparkJobState.RUNNING) { lastState = state; - switch(state) { + switch (state) { case SUBMITTED: console.printInfo("Status: Submitted"); break; @@ -88,16 +89,22 @@ public int startMonitor() { } console.printInfo("\nStatus: Running (Hive on Spark job[" + - sparkJobStatus.getJobId() + "])\n"); + sparkJobStatus.getJobId() + "])"); startTime = System.currentTimeMillis(); running = true; + + console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); } - lastReport = printStatus(progressMap, lastReport, console); + + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; break; case SUCCEEDED: - lastReport = printStatus(progressMap, lastReport, console); - double duration = (System.currentTimeMillis() - startTime)/1000.0; + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + double duration = (System.currentTimeMillis() - startTime) / 1000.0; console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); running = false; @@ -122,8 +129,8 @@ public int startMonitor() { Thread.sleep(checkInterval); } } catch (Exception e) { - console.printInfo("Exception: "+e.getMessage()); - if (++failedCounter % maxRetryInterval/checkInterval == 0 + console.printInfo("Exception: " + e.getMessage()); + if (++failedCounter % maxRetryInterval / checkInterval == 0 || e instanceof InterruptedException) { console.printInfo("Killing Job..."); console.printError("Execution has failed."); @@ -141,53 +148,97 @@ public int startMonitor() { return rc; } - private String printStatus( - Map progressMap, - String lastReport, - LogHelper console) { + private void printStatus(Map progressMap, Map lastProgressMap) { + + // do not print duplicate status while still in middle of print interval. + boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); + boolean isPassedInterval = System.currentTimeMillis() <= lastPrintTime + printInterval; + if (isDuplicateState && isPassedInterval) { + return; + } StringBuffer reportBuffer = new StringBuffer(); + SimpleDateFormat dt = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss,SSS"); + String currentDate = dt.format(new Date()); + reportBuffer.append(currentDate + "\t"); SortedSet keys = new TreeSet(progressMap.keySet()); - for (String s: keys) { - SparkProgress progress = progressMap.get(s); + for (String s : keys) { + SparkStageProgress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); + String stageName = "Stage-" + s; if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", s, complete, total)); + reportBuffer.append(String.format("%s: -/-\t", stageName, complete, total)); } else { if (complete == total && !completed.contains(s)) { completed.add(s); } - if(complete < total && (complete > 0 || running > 0 || failed > 0)) { + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { /* stage is started, but not complete */ if (failed > 0) { reportBuffer.append( - String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + String.format( + "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total)); } else { - reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + reportBuffer.append( + String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total)); } } else { + double cost = progress.getCumulativeTime() / 1000.0; /* stage is waiting for input/slots or complete */ if (failed > 0) { /* tasks finished but some failed */ - reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); + reportBuffer.append( + String.format( + "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost)); } else { - reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + if (complete == total) { + reportBuffer.append( + String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost)); + } else { + reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); + } } } } } - String report = reportBuffer.toString(); - if (!report.equals(lastReport) - || System.currentTimeMillis() >= lastPrintTime + printInterval) { - console.printInfo(report); - lastPrintTime = System.currentTimeMillis(); + lastPrintTime = System.currentTimeMillis(); + console.printInfo(reportBuffer.toString()); + } + + private boolean isSameAsPreviousProgress( + Map progressMap, + Map lastProgressMap) { + + if (lastProgressMap == null) { + return false; } - return report; + if (progressMap.isEmpty()) { + if (lastProgressMap.isEmpty()) { + return true; + } else { + return false; + } + } else { + if (lastProgressMap.isEmpty()) { + return false; + } else { + if (progressMap.size() != lastProgressMap.size()) { + return false; + } + for (String key : progressMap.keySet()) { + if (!lastProgressMap.containsKey(key) || + !progressMap.get(key).equals(lastProgressMap.get(key))) { + return false; + } + } + } + } + return true; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 8717fe2..25cb60f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -28,10 +28,8 @@ public SparkJobState getState(); - public SparkProgress getSparkJobProgress(); - public int[] getStageIds(); - public Map getSparkStageProgress(); + public Map getSparkStageProgress(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java deleted file mode 100644 index 36322eb..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status; - -public class SparkProgress { - - private int totalTaskCount; - private int succeededTaskCount; - private int runningTaskCount; - private int failedTaskCount; - private int killedTaskCount; - - public SparkProgress( - int totalTaskCount, - int succeededTaskCount, - int runningTaskCount, - int failedTaskCount, - int killedTaskCount) { - - this.totalTaskCount = totalTaskCount; - this.succeededTaskCount = succeededTaskCount; - this.runningTaskCount = runningTaskCount; - this.failedTaskCount = failedTaskCount; - this.killedTaskCount = killedTaskCount; - } - - public int getTotalTaskCount() { - return totalTaskCount; - } - - public int getSucceededTaskCount() { - return succeededTaskCount; - } - - public int getRunningTaskCount() { - return runningTaskCount; - } - - public int getFailedTaskCount() { - return failedTaskCount; - } - - public int getKilledTaskCount() { - return killedTaskCount; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof SparkProgress) { - SparkProgress other = (SparkProgress) obj; - return getTotalTaskCount() == other.getTotalTaskCount() - && getSucceededTaskCount() == other.getSucceededTaskCount() - && getRunningTaskCount() == other.getRunningTaskCount() - && getFailedTaskCount() == other.getFailedTaskCount() - && getKilledTaskCount() == other.getKilledTaskCount(); - } - return false; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("TotalTasks: "); - sb.append(getTotalTaskCount()); - sb.append(" Succeeded: "); - sb.append(getSucceededTaskCount()); - sb.append(" Running: "); - sb.append(getRunningTaskCount()); - sb.append(" Failed: "); - sb.append(getFailedTaskCount()); - sb.append(" Killed: "); - sb.append(getKilledTaskCount()); - return sb.toString(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java new file mode 100644 index 0000000..f1a2cfe --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +public class SparkStageProgress { + + private int totalTaskCount; + private int succeededTaskCount; + private int runningTaskCount; + private int failedTaskCount; + private int killedTaskCount; + private long cumulativeTime; + + public SparkStageProgress( + int totalTaskCount, + int succeededTaskCount, + int runningTaskCount, + int failedTaskCount, + int killedTaskCount, + long cumulativeTime) { + + this.totalTaskCount = totalTaskCount; + this.succeededTaskCount = succeededTaskCount; + this.runningTaskCount = runningTaskCount; + this.failedTaskCount = failedTaskCount; + this.killedTaskCount = killedTaskCount; + this.cumulativeTime = cumulativeTime; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public int getRunningTaskCount() { + return runningTaskCount; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + public int getKilledTaskCount() { + return killedTaskCount; + } + + public long getCumulativeTime() { + return cumulativeTime; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SparkStageProgress) { + SparkStageProgress other = (SparkStageProgress) obj; + return getTotalTaskCount() == other.getTotalTaskCount() + && getSucceededTaskCount() == other.getSucceededTaskCount() + && getRunningTaskCount() == other.getRunningTaskCount() + && getFailedTaskCount() == other.getFailedTaskCount() + && getKilledTaskCount() == other.getKilledTaskCount(); + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TotalTasks: "); + sb.append(getTotalTaskCount()); + sb.append(" Succeeded: "); + sb.append(getSucceededTaskCount()); + sb.append(" Running: "); + sb.append(getRunningTaskCount()); + sb.append(" Failed: "); + sb.append(getFailedTaskCount()); + sb.append(" Killed: "); + sb.append(getKilledTaskCount()); + sb.append(" CumulativeTime: "); + sb.append(getCumulativeTime() + "ms"); + return sb.toString(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index c7ef83c..6570b0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -24,11 +24,12 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkProgress; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.spark.ui.jobs.UIData; +import scala.Option; import scala.Tuple2; import static scala.collection.JavaConversions.bufferAsJavaList; @@ -61,35 +62,13 @@ public SparkJobState getState() { } @Override - public SparkProgress getSparkJobProgress() { - Map stageProgresses = getSparkStageProgress(); - - int totalTaskCount = 0; - int runningTaskCount = 0; - int completedTaskCount = 0; - int failedTaskCount = 0; - int killedTaskCount = 0; - - for (SparkProgress sparkProgress : stageProgresses.values()) { - totalTaskCount += sparkProgress.getTotalTaskCount(); - runningTaskCount += sparkProgress.getRunningTaskCount(); - completedTaskCount += sparkProgress.getSucceededTaskCount(); - failedTaskCount += sparkProgress.getFailedTaskCount(); - killedTaskCount += sparkProgress.getKilledTaskCount(); - } - - return new SparkProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, killedTaskCount); - } - - @Override public int[] getStageIds() { return jobStateListener.getStageIds(jobId); } @Override - public Map getSparkStageProgress() { - Map stageProgresses = new HashMap(); + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); int[] stageIds = jobStateListener.getStageIds(jobId); if (stageIds != null) { for (int stageId : stageIds) { @@ -104,12 +83,26 @@ public SparkProgress getSparkJobProgress() { int failedTaskCount = uiData.numFailedTasks(); int totalTaskCount = stageInfo.numTasks(); int killedTaskCount = 0; - SparkProgress stageProgress = new SparkProgress( + long costTime; + Option startOption = stageInfo.submissionTime(); + Option completeOption = stageInfo.completionTime(); + if (startOption.isEmpty()) { + costTime = 0; + } else if (completeOption.isEmpty()) { + long startTime = (Long)startOption.get(); + costTime = System.currentTimeMillis() - startTime; + } else { + long startTime = (Long)startOption.get(); + long completeTime = (Long)completeOption.get(); + costTime = completeTime - startTime; + } + SparkStageProgress stageProgress = new SparkStageProgress( totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, - killedTaskCount); + killedTaskCount, + costTime); stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress); } }