diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b805a..5cadcb44cf 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -992,8 +992,12 @@ public void dagComplete() { } totalGuaranteed = unusedGuaranteed = 0; - LOG.info( - "DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", + if (metrics != null) { + metrics.setDagId(null); + // We remove the tasks above without state checks so just reset all metrics to 0. + metrics.resetWmMetrics(); + } + LOG.info("DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", knownTasks.size(), pendingCount, runningCount); } finally { writeLock.unlock(); @@ -1019,12 +1023,16 @@ public void unblacklistNode(NodeId nodeId) { @Override public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, Object containerSignature, Object clientCookie) { + TezTaskAttemptID id = getTaskAttemptId(task); TaskInfo taskInfo = new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, - capability, hosts, racks, clock.getTime(), getTaskAttemptId(task)); - LOG.info("Received allocateRequest. task={}, priority={}, capability={}, hosts={}", task, - priority, capability, Arrays.toString(hosts)); + capability, hosts, racks, clock.getTime(), id); + LOG.info("Received allocateRequest. task={}, priority={}, capability={}, hosts={}", + task, priority, capability, Arrays.toString(hosts)); writeLock.lock(); try { + if (!dagRunning && metrics != null && id != null) { + metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); + } dagRunning = true; dagStats.registerTaskRequest(hosts, racks); } finally { @@ -1039,13 +1047,16 @@ public void allocateTask(Object task, Resource capability, ContainerId container Priority priority, Object containerSignature, Object clientCookie) { // Container affinity can be implemented as Host affinity for LLAP. Not required until // 1:1 edges are used in Hive. - TaskInfo taskInfo = - new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, - null, clock.getTime(), getTaskAttemptId(task)); - LOG.info("Received allocateRequest. task={}, priority={}, capability={}, containerId={}", task, - priority, capability, containerId); + TezTaskAttemptID id = getTaskAttemptId(task); + TaskInfo taskInfo = new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, + capability, null, null, clock.getTime(), id); + LOG.info("Received allocateRequest. task={}, priority={}, capability={}, containerId={}", + task, priority, capability, containerId); writeLock.lock(); try { + if (!dagRunning && metrics != null && id != null) { + metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); + } dagRunning = true; dagStats.registerTaskRequest(null, null); } finally { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java index 06cf7eced2..c6b5cc1770 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java @@ -40,6 +40,7 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.impl.MsInfo; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; @@ -54,6 +55,7 @@ private final JvmMetrics jvmMetrics; private final String sessionId; private final MetricsRegistry registry; + private String dagId = null; @Metric MutableGaugeInt numExecutors; @Metric @@ -112,9 +114,16 @@ public void getMetrics(MetricsCollector collector, boolean b) { .setContext("scheduler") .tag(ProcessName, "DAGAppMaster") .tag(SessionId, sessionId); + if (dagId != null) { + rb.tag(MsInfo.Context, dagId); + } getTaskSchedulerStats(rb); } + public void setDagId(String dagId) { + this.dagId = dagId; + } + public void setNumExecutors(int value) { numExecutors.set(value); } @@ -245,6 +254,15 @@ public void setWmUnusedGuaranteed(int unusedGuaranteed) { wmUnusedGuaranteedCount.set(unusedGuaranteed); } + public void resetWmMetrics() { + wmTotalGuaranteedCount.set(0); + wmUnusedGuaranteedCount.set(0); + wmGuaranteedCount.incr(-wmGuaranteedCount.value()); + wmSpeculativeCount.incr(-wmSpeculativeCount.value()); + wmGuaranteedPendingCount.incr(-wmGuaranteedPendingCount.value()); + wmSpeculativePendingCount.incr(-wmSpeculativePendingCount.value()); + } + private void getTaskSchedulerStats(MetricsRecordBuilder rb) { rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value()) .addGauge(SchedulerExecutorsPerInstance, numExecutors.value())