diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index 8287adb636..1d94244473 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hive.llap.counters; + import java.util.concurrent.atomic.AtomicLongArray; + import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Per query counters. @@ -29,11 +33,10 @@ private LlapWmCounters currentCounter = null; private long currentCounterStartTime = 0; private final AtomicLongArray fixedCounters; - private final TezCounters tezCounters; - public WmFragmentCounters(final TezCounters tezCounters) { + public WmFragmentCounters() { + // Note: WmFragmentCounters are created before Tez counters are created. this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length); - this.tezCounters = tezCounters; } public void changeStateQueued(boolean isGuaranteed) { @@ -86,6 +89,8 @@ public void changeGuaranteed(boolean isGuaranteed) { private void changeState(State newState, LlapWmCounters counter) { + // Note: there are so many different onSuccess/onFailure callbacks floating around that + // this will probably be called twice for the done state. This is ok given the sync. long newTime = System.nanoTime(); long oldTime = -1; LlapWmCounters oldCounter = null; @@ -107,8 +112,14 @@ private void changeState(State newState, LlapWmCounters counter) { private void incrCounter(LlapWmCounters counter, long delta) { fixedCounters.addAndGet(counter.ordinal(), delta); - if (tezCounters != null) { - tezCounters.findCounter(LlapWmCounters.values()[counter.ordinal()]).increment(delta); + } + + public void dumpToTezCounters(TezCounters tezCounters, boolean isLast) { + if (isLast) { + changeStateDone(); // Record the final counters. + } + for (int i = 0; i < fixedCounters.length(); ++i) { + tezCounters.findCounter(LlapWmCounters.values()[i]).setValue(fixedCounters.get(i)); } } @@ -126,8 +137,4 @@ public String toString() { sb.append(" ]"); return sb.toString(); } - - public TezCounters getTezCounters() { - return tezCounters; - } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 8cd723d2e0..ef5922ef41 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -268,8 +268,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Configuration callableConf = new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); - WmFragmentCounters wmCounters = new WmFragmentCounters( - FragmentCountersMap.getCountersForFragment(fragmentId)); + // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. + WmFragmentCounters wmCounters = new WmFragmentCounters(); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index b05e0b9e43..33ade55ee1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; @@ -92,10 +93,12 @@ @VisibleForTesting HeartbeatCallable currentCallable; + private final WmFragmentCounters wmCounters; + public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr, final String fragmentId, TezEvent initialEvent, - String fragmentRequestId) { + String fragmentRequestId, WmFragmentCounters wmCounters) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -109,6 +112,7 @@ public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, heartbeatExecutor = MoreExecutors.listeningDecorator(executor); this.completionListener = completionListener; this.fragmentRequestId = fragmentRequestId; + this.wmCounters = wmCounters; } /** @@ -120,8 +124,9 @@ public synchronized void registerTask(RuntimeTask task, TezCounters tezCounters = task.addAndGetTezCounter(fragmentId); FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters); LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName()); - currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval, - maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId); + currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, + sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent, + fragmentRequestId, wmCounters); ListenableFuture future = heartbeatExecutor.submit(currentCallable); Futures.addCallback(future, new HeartbeatCallback(errorReporter)); } @@ -170,6 +175,7 @@ public void shutdown() { private final ReentrantLock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); + private final WmFragmentCounters wmCounters; /* * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send / @@ -188,7 +194,7 @@ public HeartbeatCallable( RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr, - TezEvent initialEvent, String fragmentRequestId) { + TezEvent initialEvent, String fragmentRequestId, WmFragmentCounters wmCounters) { this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -198,6 +204,7 @@ public HeartbeatCallable( this.initialEvent = initialEvent; this.completionListener = completionListener; this.fragmentRequestId = fragmentRequestId; + this.wmCounters = wmCounters; this.task = task; this.umbilical = umbilical; @@ -275,7 +282,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } - updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); + updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters, false), updateEventMetadata); events.add(updateEvent); } @@ -378,7 +385,8 @@ private void maybeLogCounters() { private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { // Ensure only one final event is ever sent. if (!finalEventQueued.getAndSet(true)) { - TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent( + getStatusUpdateEvent(true, true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); if (LOG.isDebugEnabled()) { @@ -392,7 +400,7 @@ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException } } - private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { + private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters, boolean isLast) { TezCounters counters = null; TaskStatistics stats = null; float progress = 0; @@ -403,6 +411,9 @@ private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { if (sendCounters) { // send these potentially large objects at longer intervals to avoid overloading the AM counters = task.getCounters(); + if (wmCounters != null && counters != null) { + wmCounters.dumpToTezCounters(counters, isLast); + } stats = task.getTaskStatistics(); } } @@ -444,7 +455,8 @@ private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, srcMeta == null ? updateEventMetadata : srcMeta)); } try { - tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata)); + tezEvents.add(new TezEvent( + getStatusUpdateEvent(true, true), updateEventMetadata)); } catch (Exception e) { // Counter may exceed limitation LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out"); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index b484a13e48..7f436e2326 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -269,7 +269,8 @@ public LlapTaskUmbilicalProtocol run() throws Exception { request.getContainerIdString(), fragmentId, initialEvent, - requestId); + requestId, + wmCounters); String attemptId = fragmentInfo.getFragmentIdentifierString(); IOContextMap.setThreadAttemptId(attemptId);