diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3356dc9..7ee6186 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -194,7 +194,7 @@ public int execute(DriverContext driverContext) { additionalLr, inputOutputJars, inputOutputLocalResources); // finally monitor will print progress until the job is done - TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap(),dagClient, conf, dag, ctx); + TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),dagClient, conf, dag, ctx); rc = monitor.monitorExecution(); if (rc != 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 049d7fd..9e2846c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -68,7 +68,7 @@ Licensed to the Apache Software Foundation (ASF) under one private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final List shutdownList; - private final Map workMap; + private final List topSortedWorks; transient LogHelper console; @@ -101,9 +101,9 @@ public static void initShutdownHook() { private long executionStartTime = 0; private final RenderStrategy.UpdateFunction updateFunction; - public TezJobMonitor(Map workMap, final DAGClient dagClient, HiveConf conf, DAG dag, + public TezJobMonitor(List topSortedWorks, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx) { - this.workMap = workMap; + this.topSortedWorks = topSortedWorks; this.dagClient = dagClient; this.hiveConf = conf; this.dag = dag; @@ -323,7 +323,7 @@ public String getDiagnostics() { ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { try { - return new TezProgressMonitor(dagClient, status, workMap, progressMap, console, + return new TezProgressMonitor(dagClient, status, topSortedWorks, progressMap, console, executionStartTime); } catch (IOException | TezException e) { console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java index 9739ad7..fa0a40f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -42,7 +42,7 @@ class TezProgressMonitor implements ProgressMonitor { private static final int COLUMN_1_WIDTH = 16; - private final Map workMap; + private final List topSortedWork; private final SessionState.LogHelper console; private final long executionStartTime; private final DAGStatus status; @@ -53,11 +53,11 @@ * Try to get most the data required from dagClient in the constructor itself so that even after * the tez job has finished this object can be used for later use.s */ - TezProgressMonitor(DAGClient dagClient, DAGStatus status, Map workMap, + TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, Map progressMap, SessionState.LogHelper console, long executionStartTime) throws IOException, TezException { this.status = status; - this.workMap = workMap; + this.topSortedWork = topSortedWork; this.console = console; this.executionStartTime = executionStartTime; for (Map.Entry entry : progressMap.entrySet()) { @@ -88,17 +88,17 @@ public List> rows() { try { List> results = new ArrayList<>(); - SortedSet keys = new TreeSet<>(progressCountsMap.keySet()); - for (String s : keys) { - VertexProgress progress = progressCountsMap.get(s); + for (BaseWork baseWork : topSortedWork) { + String vertexName = baseWork.getName(); + VertexProgress progress = progressCountsMap.get(vertexName); // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0 results.add( Arrays.asList( - getNameWithProgress(s, progress.succeededTaskCount, progress.totalTaskCount), - getMode(s, workMap), - progress.vertexStatus(vertexStatusMap.get(s)), + getNameWithProgress(vertexName, progress.succeededTaskCount, progress.totalTaskCount), + getMode(baseWork), + progress.vertexStatus(vertexStatusMap.get(vertexName)), progress.total(), progress.completed(), progress.running(), @@ -194,9 +194,8 @@ private String getNameWithProgress(String s, int complete, int total) { return result; } - private String getMode(String name, Map workMap) { + private String getMode(BaseWork work) { String mode = "container"; - BaseWork work = workMap.get(name); if (work != null) { // uber > llap > container if (work.getUberMode()) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java index 1e0fa14..98067df 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java @@ -32,6 +32,7 @@ import org.mockito.runners.MockitoJUnitRunner; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -92,7 +93,7 @@ public void setupInternalStateOnObjectCreation() throws IOException, TezExceptio when(dagClient.getVertexStatus(eq(REDUCER), anySet())).thenReturn(running); TezProgressMonitor monitor = - new TezProgressMonitor(dagClient, dagStatus, new HashMap(), progressMap(), console, + new TezProgressMonitor(dagClient, dagStatus, new ArrayList(), progressMap(), console, Long.MAX_VALUE); verify(dagClient).getVertexStatus(eq(MAPPER), isNull(Set.class));