diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index 8287adb636..14bcace8c8 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -18,22 +18,27 @@ package org.apache.hadoop.hive.llap.counters; import java.util.concurrent.atomic.AtomicLongArray; + import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Per query counters. */ public class WmFragmentCounters { + private static final Logger LOG = LoggerFactory.getLogger(WmFragmentCounters.class); + private static enum State { NONE, QUEUED, RUNNING, DONE }; private State currentState = State.NONE; private LlapWmCounters currentCounter = null; private long currentCounterStartTime = 0; - private final AtomicLongArray fixedCounters; - private final TezCounters tezCounters; + private final long[] fixedCounters; + private TezCounters tezCounters; - public WmFragmentCounters(final TezCounters tezCounters) { - this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length); - this.tezCounters = tezCounters; + public WmFragmentCounters() { + // Note: WmFragmentCounters are created before Tez counters are created; we populate them later. + this.fixedCounters = new long[LlapWmCounters.values().length]; } public void changeStateQueued(boolean isGuaranteed) { @@ -105,10 +110,22 @@ private void changeState(State newState, LlapWmCounters counter) { } } - private void incrCounter(LlapWmCounters counter, long delta) { - fixedCounters.addAndGet(counter.ordinal(), delta); + private synchronized void incrCounter(LlapWmCounters counter, long delta) { + fixedCounters[counter.ordinal()] += delta; + LlapWmCounters e = LlapWmCounters.values()[counter.ordinal()]; + if (tezCounters != null) { + tezCounters.findCounter(e).increment(delta); + } + } + + public synchronized void setTezCounters(TezCounters counters) { if (tezCounters != null) { - tezCounters.findCounter(LlapWmCounters.values()[counter.ordinal()]).increment(delta); + LOG.warn("Tez counters are set twice, ignoring. Counter values may be incorrect."); + return; + } + tezCounters = counters; + for (int i = 0; i < LlapWmCounters.values().length; ++i) { + tezCounters.findCounter(LlapWmCounters.values()[i]).increment(fixedCounters[i]); } } @@ -117,11 +134,11 @@ public String toString() { // We rely on NDC information in the logs to map counters to attempt. // If that is not available, appId should either be passed in, or extracted from NDC. StringBuilder sb = new StringBuilder("[ "); - for (int i = 0; i < fixedCounters.length(); ++i) { + for (int i = 0; i < fixedCounters.length; ++i) { if (i != 0) { sb.append(", "); } - sb.append(LlapWmCounters.values()[i].name()).append("=").append(fixedCounters.get(i)); + sb.append(LlapWmCounters.values()[i].name()).append("=").append(fixedCounters[i]); } sb.append(" ]"); return sb.toString(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 8cd723d2e0..ef5922ef41 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -268,8 +268,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Configuration callableConf = new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); - WmFragmentCounters wmCounters = new WmFragmentCounters( - FragmentCountersMap.getCountersForFragment(fragmentId)); + // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. + WmFragmentCounters wmCounters = new WmFragmentCounters(); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index b05e0b9e43..43c2dc2da3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; @@ -92,10 +93,14 @@ @VisibleForTesting HeartbeatCallable currentCallable; + // WM counters are created at queueing time, and we need to add TezCounters to them. + // However, we can only add TezCounters at execution time. + private final WmFragmentCounters wmCounters; + public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr, final String fragmentId, TezEvent initialEvent, - String fragmentRequestId) { + String fragmentRequestId, WmFragmentCounters wmCounters) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -109,6 +114,7 @@ public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, heartbeatExecutor = MoreExecutors.listeningDecorator(executor); this.completionListener = completionListener; this.fragmentRequestId = fragmentRequestId; + this.wmCounters = wmCounters; } /** @@ -120,6 +126,9 @@ public synchronized void registerTask(RuntimeTask task, TezCounters tezCounters = task.addAndGetTezCounter(fragmentId); FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters); LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName()); + if (wmCounters != null) { + wmCounters.setTezCounters(tezCounters); + } currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId); ListenableFuture future = heartbeatExecutor.submit(currentCallable); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index b484a13e48..7f436e2326 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -269,7 +269,8 @@ public LlapTaskUmbilicalProtocol run() throws Exception { request.getContainerIdString(), fragmentId, initialEvent, - requestId); + requestId, + wmCounters); String attemptId = fragmentInfo.getFragmentIdentifierString(); IOContextMap.setThreadAttemptId(attemptId);