diff --git common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java index bfdb4fa..6db5c18 100644 --- common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java +++ common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java @@ -163,6 +163,7 @@ public String apply(@Nullable List row) { progressStr, elapsedTime); + reprintLine(SEPARATOR); reprintLineWithColorAsBold(footer, Ansi.Color.RED); reprintLine(SEPARATOR); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java index 5840ad6..1400be4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java @@ -26,16 +26,11 @@ class DAGSummary implements PrintSummary { - private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34; - private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-"); - - private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s"; + private static final String FILE_HEADER_SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-"); + private static final String FORMATTING_PATTERN = "%10s %17s %14s %14s %15s %16s"; private static final String FILE_HEADER = String.format( FORMATTING_PATTERN, "VERTICES", - "TOTAL_TASKS", - "FAILED_ATTEMPTS", - "KILLED_TASKS", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", @@ -170,9 +165,6 @@ private String vertexSummary(String vertexName, Progress progress, VertexStatus return String.format(FORMATTING_PATTERN, vertexName, - progress.getTotalTaskCount(), - progress.getFailedTaskAttemptCount(), - progress.getKilledTaskAttemptCount(), secondsFormatter.format((duration)), commaFormatter.format(cpuTimeMillis), commaFormatter.format(gcTimeMillis), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java new file mode 100644 index 0000000..e8b80f9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java @@ -0,0 +1,146 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; + +import java.io.StringWriter; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +class RenderStrategy { + + interface UpdateFunction { + void update(DAGStatus status, Map vertexProgressMap); + } + + private abstract static class DefaultUpdateFunction implements UpdateFunction { + private static final int PRINT_INTERVAL = 3000; + + final TezJobMonitor monitor; + private final PerfLogger perfLogger; + + private long lastPrintTime = 0L; + private String lastReport = null; + + DefaultUpdateFunction(TezJobMonitor monitor) { + this.monitor = monitor; + perfLogger = SessionState.getPerfLogger(); + } + + @Override + public void update(DAGStatus status, Map vertexProgressMap) { + renderProgress(monitor.progressMonitor(status, vertexProgressMap)); + String report = getReport(vertexProgressMap); + if (showReport(report)) { + renderReport(report); + lastReport = report; + lastPrintTime = System.currentTimeMillis(); + } + } + + private boolean showReport(String report) { + return !report.equals(lastReport) + || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL; + } + + private String getReport(Map progressMap) { + StringWriter reportBuffer = new StringWriter(); + + SortedSet keys = new TreeSet(progressMap.keySet()); + for (String s : keys) { + Progress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskAttemptCount(); + if (total <= 0) { + reportBuffer.append(String.format("%s: -/-\t", s)); + } else { + if (complete == total) { + /* + * We may have missed the start of the vertex due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + perfLogger.PerfLogEnd(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + /* vertex is started, but not complete */ + if (failed > 0) { + reportBuffer.append( + String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + } + } else { + /* vertex is waiting for input/slots or complete */ + if (failed > 0) { + /* tasks finished but some failed */ + reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + } + } + } + } + + return reportBuffer.toString(); + } + + abstract void renderProgress(ProgressMonitor progressMonitor); + + abstract void renderReport(String report); + } + + static class LogToFileFunction extends DefaultUpdateFunction { + + LogToFileFunction(TezJobMonitor monitor) { + super(monitor); + } + + @Override + public void renderProgress(ProgressMonitor progressMonitor) { + SessionState.get().updateProgressMonitor(progressMonitor); + } + + @Override + public void renderReport(String report) { + monitor.console.printInfo(report); + } + } + + static class InPlaceUpdateFunction extends DefaultUpdateFunction { + /** + * Have to use the same instance to render else the number lines printed earlier is lost and the + * screen will print the table again and again. + */ + private final InPlaceUpdate inPlaceUpdate; + + InPlaceUpdateFunction(TezJobMonitor monitor) { + super(monitor); + inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream()); + } + + @Override + public void renderProgress(ProgressMonitor progressMonitor) { + inPlaceUpdate.render(progressMonitor); + } + + @Override + public void renderReport(String report) { + monitor.console.logInfo(report); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index c0a068d..f2f97f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -47,8 +47,6 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -59,24 +57,18 @@ Licensed to the Apache Software Foundation (ASF) under one */ public class TezJobMonitor { - private static final String CLASS_NAME = TezJobMonitor.class.getName(); + static final String CLASS_NAME = TezJobMonitor.class.getName(); private static final int CHECK_INTERVAL = 200; private static final int MAX_RETRY_INTERVAL = 2500; - private static final int PRINT_INTERVAL = 3000; private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final List shutdownList; private final Map workMap; - private transient LogHelper console; + transient LogHelper console; - private long lastPrintTime; private StringWriter diagnostics = new StringWriter(); - interface UpdateFunction { - void update(DAGStatus status, Map vertexProgressMap, String report); - } - static { shutdownList = new LinkedList<>(); ShutdownHookManager.addShutdownHook(new Runnable() { @@ -102,12 +94,7 @@ public static void initShutdownHook() { private final DAG dag; private final Context context; private long executionStartTime = 0; - private final UpdateFunction updateFunction; - /** - * Have to use the same instance to render else the number lines printed earlier is lost and the - * screen will print the table again and again. - */ - private final InPlaceUpdate inPlaceUpdate; + private final RenderStrategy.UpdateFunction updateFunction; public TezJobMonitor(Map workMap, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx) { @@ -117,29 +104,15 @@ public TezJobMonitor(Map workMap, final DAGClient dagClient, H this.dag = dag; this.context = ctx; console = SessionState.getConsole(); - inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream()); updateFunction = updateFunction(); } - private UpdateFunction updateFunction() { - UpdateFunction logToFileFunction = new UpdateFunction() { - @Override - public void update(DAGStatus status, Map vertexProgressMap, String report) { - SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap)); - console.printInfo(report); - } - }; - UpdateFunction inPlaceUpdateFunction = new UpdateFunction() { - @Override - public void update(DAGStatus status, Map vertexProgressMap, String report) { - inPlaceUpdate.render(progressMonitor(status, vertexProgressMap)); - console.logInfo(report); - } - }; + private RenderStrategy.UpdateFunction updateFunction() { return InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent() && !SessionState.get().isHiveServerQuery() - ? inPlaceUpdateFunction : logToFileFunction; + ? new RenderStrategy.InPlaceUpdateFunction(this) + : new RenderStrategy.LogToFileFunction(this); } private boolean isProfilingEnabled() { @@ -163,7 +136,6 @@ public int monitorExecution() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); DAGStatus.State lastState = null; - String lastReport = null; boolean running = false; while (true) { @@ -195,13 +167,13 @@ public int monitorExecution() { this.executionStartTime = System.currentTimeMillis(); running = true; } - lastReport = updateStatus(status, vertexProgressMap, lastReport); + updateFunction.update(status, vertexProgressMap); break; case SUCCEEDED: if (!running) { this.executionStartTime = monitorStartTime; } - lastReport = updateStatus(status, vertexProgressMap, lastReport); + updateFunction.update(status, vertexProgressMap); success = true; running = false; done = true; @@ -210,7 +182,7 @@ public int monitorExecution() { if (!running) { this.executionStartTime = monitorStartTime; } - lastReport = updateStatus(status, vertexProgressMap, lastReport); + updateFunction.update(status, vertexProgressMap); console.printInfo("Status: Killed"); running = false; done = true; @@ -221,7 +193,7 @@ public int monitorExecution() { if (!running) { this.executionStartTime = monitorStartTime; } - lastReport = updateStatus(status, vertexProgressMap, lastReport); + updateFunction.update(status, vertexProgressMap); console.printError("Status: Failed"); running = false; done = true; @@ -323,71 +295,11 @@ static long getCounterValueByGroupName(TezCounters vertexCounters, String groupN return (tezCounter == null) ? 0 : tezCounter.getValue(); } - private String updateStatus(DAGStatus status, Map vertexProgressMap, - String lastReport) { - String report = getReport(vertexProgressMap); - if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) { - updateFunction.update(status, vertexProgressMap, report); - lastPrintTime = System.currentTimeMillis(); - } - return report; - } - - private String getReport(Map progressMap) { - StringBuilder reportBuffer = new StringBuilder(); - - SortedSet keys = new TreeSet(progressMap.keySet()); - for (String s : keys) { - Progress progress = progressMap.get(s); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskAttemptCount(); - if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", s)); - } else { - if (complete == total) { - /* - * We may have missed the start of the vertex due to the 3 seconds interval - */ - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - if (complete < total && (complete > 0 || running > 0 || failed > 0)) { - - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - - /* vertex is started, but not complete */ - if (failed > 0) { - reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); - } - } else { - /* vertex is waiting for input/slots or complete */ - if (failed > 0) { - /* tasks finished but some failed */ - reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); - } - } - } - } - - return reportBuffer.toString(); - } - public String getDiagnostics() { return diagnostics.toString(); } - private ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { + ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { try { return new TezProgressMonitor(dagClient, status, workMap, progressMap, console, executionStartTime);