diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 1f25f76..ef596ac 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -231,7 +231,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t eventsToSend.drainTo(events); if (!task.isTaskDone() && !task.hadFatalError()) { - TezCounters counters = null; + boolean sendCounters = false; /** * Increasing the heartbeat interval can delay the delivery of events. Sending just updated * records would save CPU in DAG AM, but certain counters are updated very frequently. Until @@ -239,11 +239,10 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t */ // Not completely accurate, since OOB heartbeats could go out. if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) { - counters = task.getCounters(); + sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } - updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()), - updateEventMetadata); + updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); events.add(updateEvent); } @@ -321,13 +320,17 @@ private void maybeLogCounters() { * indicates an exception somewhere in the AM. */ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } + private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { + return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null), + task.getProgress(), task.getTaskStatistics()); + } + /** * Sends out final events for task failure. * @param taskAttemptID @@ -342,8 +345,7 @@ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException */ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); if (diagnostics == null) { diagnostics = ExceptionUtils.getStackTrace(t); } else { @@ -382,18 +384,18 @@ public void onFailure(Throwable t) { } } - public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { + public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { return currentCallable.taskSucceeded(taskAttemptID); } @Override - public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); } @Override - public void addEvents(TezTaskAttemptID taskAttemptID, Collection events) { + public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection events) { currentCallable.addEvents(taskAttemptID, events); } diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 3a827c3..8100ece 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -312,7 +312,7 @@ public int getClusterNodeCount() { } @Override - public void resetMatchLocalityForAllHeldContainers() { + public void dagComplete() { // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 9fd4ba6..ac8dcba 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -30,6 +30,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; public class MiniLlapCluster extends AbstractService { private static final Log LOG = LogFactory.getLog(MiniLlapCluster.class); @@ -138,6 +139,10 @@ public void serviceStart() { numExecutorsPerService); clusterSpecificConfiguration.setLong( LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService); + // Optimize local fetch does not work with LLAP due to different local directories + // used by containers and LLAP + clusterSpecificConfiguration + .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); } @Override