commit 8d2f6cf51f3150dbfbad4007f2a64b001eaf1152 Author: Bharath Krishna Date: Wed May 30 13:08:44 2018 -0700 HIVE-19508 : SparkJobMonitor getReport doesn't print stage progress in order. Refactoring the progress map to use SparkStage instead of String as key Adding unit tests for getReport. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 803877162d5fe6b16e0ec90051d0afbfcb4daab2..635eb88108c6e367ea1c8390b6a73036a0174671 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -470,11 +471,11 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { stageIds.add(stageId); } } - Map progressMap = sparkJobStatus.getSparkStageProgress(); + Map progressMap = sparkJobStatus.getSparkStageProgress(); int sumTotal = 0; int sumComplete = 0; int sumFailed = 0; - for (String s : progressMap.keySet()) { + for (SparkStage s : progressMap.keySet()) { SparkStageProgress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 4ce9f53a3732460adfb887466627d02c84f7111c..2a6c33bfd4824c96e7004cd1ecce48c62c97d685 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -42,7 +42,7 @@ public int startMonitor() { boolean done = false; int rc = 0; JobExecutionStatus lastState = null; - Map lastProgressMap = null; + Map lastProgressMap = null; perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); @@ -68,7 +68,7 @@ public int startMonitor() { } } else if (state != lastState || state == JobExecutionStatus.RUNNING) { lastState = state; - Map progressMap = sparkJobStatus.getSparkStageProgress(); + Map progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { case RUNNING: diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 98c228b26837e20e05dc9bfe3a4c71cea1dc4485..004b50ba95934280cf302055a46a5d984b421e07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -56,7 +56,7 @@ public int startMonitor() { boolean running = false; boolean done = false; int rc = 0; - Map lastProgressMap = null; + Map lastProgressMap = null; perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); @@ -89,7 +89,7 @@ public int startMonitor() { case STARTED: JobExecutionStatus sparkJobState = sparkJobStatus.getState(); if (sparkJobState == JobExecutionStatus.RUNNING) { - Map progressMap = sparkJobStatus.getSparkStageProgress(); + Map progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); printAppInfo(); @@ -137,7 +137,7 @@ public int startMonitor() { } break; case SUCCEEDED: - Map progressMap = sparkJobStatus.getSparkStageProgress(); + Map progressMap = sparkJobStatus.getSparkStageProgress(); printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; double duration = (System.currentTimeMillis() - startTime) / 1000.0; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 7afd8864075aa0d9708274eea8839c662324c732..e78b1cd6637c46070378c25a372916817fe99a59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -88,7 +88,7 @@ protected SparkJobMonitor(HiveConf hiveConf) { public abstract int startMonitor(); - private void printStatusInPlace(Map progressMap) { + private void printStatusInPlace(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); @@ -104,11 +104,11 @@ private void printStatusInPlace(Map progressMap) { reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); reprintLine(SEPARATOR); - SortedSet keys = new TreeSet(progressMap.keySet()); + SortedSet keys = new TreeSet(progressMap.keySet()); int idx = 0; final int numKey = keys.size(); - for (String s : keys) { - SparkStageProgress progress = progressMap.get(s); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); @@ -116,6 +116,7 @@ private void printStatusInPlace(Map progressMap) { sumTotal += total; sumComplete += complete; + String s = stage.toString(); StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; if (complete > 0 || running > 0 || failed > 0) { if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { @@ -130,9 +131,8 @@ private void printStatusInPlace(Map progressMap) { } } - int div = s.indexOf('_'); - String attempt = div > 0 ? s.substring(div + 1) : "-"; - String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s); + String attempt = String.valueOf(stage.getAttemptId()); + String stageName = "Stage-" + String.valueOf(stage.getStageId()); String nameWithProgress = getNameWithProgress(stageName, complete, total); final int pending = total - complete - running; @@ -151,8 +151,8 @@ private void printStatusInPlace(Map progressMap) { reprintLine(SEPARATOR); } - protected void printStatus(Map progressMap, - Map lastProgressMap) { + protected void printStatus(Map progressMap, + Map lastProgressMap) { // do not print duplicate status while still in middle of print interval. boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); @@ -172,7 +172,7 @@ protected void printStatus(Map progressMap, lastPrintTime = System.currentTimeMillis(); } - protected int getTotalTaskCount(Map progressMap) { + protected int getTotalTaskCount(Map progressMap) { int totalTasks = 0; for (SparkStageProgress progress: progressMap.values() ) { totalTasks += progress.getTotalTaskCount(); @@ -181,7 +181,7 @@ protected int getTotalTaskCount(Map progressMap) { return totalTasks; } - protected int getStageMaxTaskCount(Map progressMap) { + protected int getStageMaxTaskCount(Map progressMap) { int stageMaxTasks = 0; for (SparkStageProgress progress: progressMap.values() ) { int tasks = progress.getTotalTaskCount(); @@ -193,7 +193,7 @@ protected int getStageMaxTaskCount(Map progressMap) return stageMaxTasks; } - private String getReport(Map progressMap) { + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); String currentDate = dt.format(new Date()); @@ -203,16 +203,16 @@ private String getReport(Map progressMap) { int sumTotal = 0; int sumComplete = 0; - SortedSet keys = new TreeSet(progressMap.keySet()); - for (String s : keys) { - SparkStageProgress progress = progressMap.get(s); + SortedSet keys = new TreeSet(progressMap.keySet()); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); sumTotal += total; sumComplete += complete; - + String s = stage.toString(); String stageName = "Stage-" + s; if (total <= 0) { reportBuffer.append(String.format("%s: -/-\t", stageName)); @@ -266,8 +266,8 @@ private String getReport(Map progressMap) { } private boolean isSameAsPreviousProgress( - Map progressMap, - Map lastProgressMap) { + Map progressMap, + Map lastProgressMap) { if (lastProgressMap == null) { return false; @@ -282,7 +282,7 @@ private boolean isSameAsPreviousProgress( if (progressMap.size() != lastProgressMap.size()) { return false; } - for (String key : progressMap.keySet()) { + for (SparkStage key : progressMap.keySet()) { if (!lastProgressMap.containsKey(key) || !progressMap.get(key).equals(lastProgressMap.get(key))) { return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 1e584f4f7a53c3424023451dc161e488a3f3da27..e8596c692db8a0fe4d9452109a4c822a4cc4a489 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -37,7 +37,7 @@ int[] getStageIds() throws HiveException; - Map getSparkStageProgress() throws HiveException; + Map getSparkStageProgress() throws HiveException; SparkCounters getCounter(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java new file mode 100644 index 0000000000000000000000000000000000000000..6228d96b17382adfc7eca3a90e722139f41cb5e6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStage.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.Objects; + +public class SparkStage implements Comparable { + + private int stageId; + private int attemptId; + + public SparkStage(int stageId, int attemptId) { + this.stageId = stageId; + this.attemptId = attemptId; + } + + public int getStageId() { + return stageId; + } + + public int getAttemptId() { + return attemptId; + } + + @Override + public int compareTo(SparkStage stage) { + if (this.stageId == stage.stageId) { + return Integer.compare(this.attemptId, stage.attemptId); + } + return Integer.compare(this.stageId, stage.stageId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SparkStage that = (SparkStage) o; + return getStageId() == that.getStageId() && getAttemptId() == that.getAttemptId(); + } + + @Override + public int hashCode() { + return Objects.hash(stageId, attemptId); + } + + @Override + public String toString() { + return String.valueOf(this.stageId) + "_" + String.valueOf(this.attemptId); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 4368eb0da5d4e835e9e3e68876f80ff719a60e01..0b74ffe0e3adc8b8deac5c6201f221f457dd48a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,8 +22,7 @@ import java.util.Map; import java.util.Set; -import com.google.common.base.Throwables; - +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,8 +104,8 @@ public JobExecutionStatus getState() { } @Override - public Map getSparkStageProgress() { - Map stageProgresses = new HashMap(); + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); for (int stageId : getStageIds()) { SparkStageInfo sparkStageInfo = getStageInfo(stageId); if (sparkStageInfo != null) { @@ -116,8 +115,8 @@ public JobExecutionStatus getState() { int totalTaskCount = sparkStageInfo.numTasks(); SparkStageProgress sparkStageProgress = new SparkStageProgress( totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" - + sparkStageInfo.currentAttemptId(), sparkStageProgress); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); } } return stageProgresses; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e4a53fb134e0e727bb5ad5802da098b673ab0c97..d2e28b02bae6983a8b5aa18d85e54675f0aa2a24 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,8 +99,8 @@ public JobExecutionStatus getState() throws HiveException { } @Override - public Map getSparkStageProgress() throws HiveException { - Map stageProgresses = new HashMap(); + public Map getSparkStageProgress() throws HiveException { + Map stageProgresses = new HashMap(); for (int stageId : getStageIds()) { SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); if (sparkStageInfo != null && sparkStageInfo.name() != null) { @@ -109,8 +110,8 @@ public JobExecutionStatus getState() throws HiveException { int totalTaskCount = sparkStageInfo.numTasks(); SparkStageProgress sparkStageProgress = new SparkStageProgress( totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" - + sparkStageInfo.currentAttemptId(), sparkStageProgress); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); } } return stageProgresses; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..e66354f0869738bd3cf0eb831c13fa6af1eda256 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +/** + * Test spark progress monitoring information. + */ +public class TestSparkJobMonitor { + + private HiveConf testConf; + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + private SparkJobMonitor monitor; + private PrintStream curOut; + private PrintStream curErr; + + @Before + public void setUp() { + testConf = new HiveConf(); + curOut = System.out; + curErr = System.err; + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + monitor = new SparkJobMonitor(testConf) { + @Override + public int startMonitor() { + return 0; + } + }; + + } + + private Map progressMap() { + return new HashMap() {{ + put(new SparkStage(1, 0), new SparkStageProgress(4, 3, 1, 0)); + put(new SparkStage(3, 1), new SparkStageProgress(6, 4, 1, 1)); + put(new SparkStage(9, 0), new SparkStageProgress(5, 5, 0, 0)); + put(new SparkStage(10, 2), new SparkStageProgress(5, 3, 2, 0)); + put(new SparkStage(15, 1), new SparkStageProgress(4, 3, 1, 0)); + put(new SparkStage(15, 2), new SparkStageProgress(4, 4, 0, 0)); + put(new SparkStage(20, 3), new SparkStageProgress(3, 1, 1, 1)); + put(new SparkStage(21, 1), new SparkStageProgress(2, 2, 0, 0)); + }}; + } + + @Test + public void testGetReport() { + Map progressMap = progressMap(); + monitor.printStatus(progressMap, null); + assertTrue(errContent.toString().contains( + "Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 Finished\tStage-10_2: 3(+2)/5\t" + + "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 1(+1,-1)/3\tStage-21_1: 2/2 Finished")); + } + + @After + public void tearDown() { + System.setOut(curOut); + System.setErr(curErr); + } +}