diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index ed8df95..11ba793 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; -import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; +import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; @@ -187,8 +187,8 @@ private boolean trySchedule(TaskRunnerCallable task) { boolean scheduled = false; try { - ListenableFuture future = executorService.submit(task); - FutureCallback wrappedCallback = + ListenableFuture future = executorService.submit(task); + FutureCallback wrappedCallback = new InternalCompletionListener(task.getCallback()); Futures.addCallback(future, wrappedCallback); @@ -252,8 +252,8 @@ private boolean trySchedule(TaskRunnerCallable task) { // try to submit the task from wait queue to executor service. If it gets rejected the // task from wait queue will hold on to its position for next try. try { - ListenableFuture future = executorService.submit(task); - FutureCallback wrappedCallback = + ListenableFuture future = executorService.submit(task); + FutureCallback wrappedCallback = new InternalCompletionListener(task.getCallback()); Futures.addCallback(future, wrappedCallback); numSlotsAvailable.decrementAndGet(); @@ -285,14 +285,14 @@ private synchronized void removeTaskFromPreemptionList(TaskRunnerCallable pReque } private synchronized void addTaskToPreemptionList(TaskRunnerCallable task, - ListenableFuture future) { + ListenableFuture future) { idToTaskMap.put(task.getRequestId(), task); preemptionMap.put(task, future); preemptionQueue.add(task); } private final class InternalCompletionListener implements - FutureCallback { + FutureCallback { private TaskRunnerCallable.TaskRunnerCallback wrappedCallback; public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedCallback) { @@ -300,7 +300,7 @@ public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedC } @Override - public void onSuccess(ContainerExecutionResult result) { + public void onSuccess(TaskRunner2Result result) { wrappedCallback.onSuccess(result); updatePreemptionListAndNotify(true); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 97a8b78..7e7c133 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -27,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -45,12 +45,10 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Logger; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.InputSpec; @@ -58,8 +56,7 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.library.input.UnorderedKVInput; -import org.apache.tez.runtime.task.TezChild; -import org.apache.tez.runtime.task.TezTaskRunner; +import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultimap; @@ -68,12 +65,15 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.runtime.task.TezTaskRunner2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ -public class TaskRunnerCallable extends CallableWithNdc { - private static final Logger LOG = Logger.getLogger(TaskRunnerCallable.class); +public class TaskRunnerCallable extends CallableWithNdc { + private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; private final Configuration conf; private final String[] localDirs; @@ -88,7 +88,7 @@ private final AMReporter amReporter; private final ConcurrentMap sourceCompletionMap; private final TaskSpec taskSpec; - private volatile TezTaskRunner taskRunner; + private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; private LlapTaskUmbilicalProtocol umbilical; @@ -126,7 +126,7 @@ } @Override - protected TezChild.ContainerExecutionResult callInternal() throws Exception { + protected TaskRunner2Result callInternal() throws Exception { this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); if (LOG.isDebugEnabled()) { @@ -177,43 +177,29 @@ public LlapTaskUmbilicalProtocol run() throws Exception { new AtomicLong(0), request.getContainerIdString()); - taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, + taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, taskSpec, request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memoryAvailable); - boolean shouldDie; try { - shouldDie = !taskRunner.run(); - if (shouldDie) { - LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null, - "Asked to die by the AM"); + TaskRunner2Result result = taskRunner.run(); + if (result.isContainerShutdownRequested()) { + LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); } - } catch (IOException e) { - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); - } catch (TezException e) { - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); + return result; + } finally { // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. // FileSystem.closeAllForUGI(taskUgi); + LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + + sw.stop().elapsed(TimeUnit.MILLISECONDS)); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } } - LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - sw.stop().elapsedMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, - null); } /** @@ -311,7 +297,7 @@ public TaskRunnerCallback getCallback() { return new TaskRunnerCallback(request, this); } - final class TaskRunnerCallback implements FutureCallback { + final class TaskRunnerCallback implements FutureCallback { private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; private final TaskRunnerCallable taskRunnerCallable; @@ -330,25 +316,29 @@ public String getRequestId() { // TODO Slightly more useful error handling @Override - public void onSuccess(TezChild.ContainerExecutionResult result) { - switch (result.getExitStatus()) { + public void onSuccess(TaskRunner2Result result) { + switch(result.getEndReason()) { + // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: - LOG.info("Successfully finished: " + requestId); + LOG.info("Successfully finished {}", requestId); metrics.incrExecutorTotalSuccess(); break; - case EXECUTION_FAILURE: - LOG.info("Failed to run: " + requestId); - metrics.incrExecutorTotalExecutionFailed(); + case CONTAINER_STOP_REQUESTED: + LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); break; - case INTERRUPTED: - LOG.info("Interrupted while running: " + requestId); - metrics.incrExecutorTotalInterrupted(); + case KILL_REQUESTED: + // TODO Send a kill out to the AM. break; - case ASKED_TO_DIE: - LOG.info("Asked to die while running: " + requestId); - metrics.incrExecutorTotalAskedToDie(); + case COMMUNICATION_FAILURE: + LOG.info("Failed to run {} due to communication failure", requestId); + metrics.incrExecutorTotalExecutionFailed(); + break; + case TASK_ERROR: + LOG.info("Failed to run {} due to task error", requestId); + metrics.incrExecutorTotalExecutionFailed(); break; } + taskRunnerCallable.shutdown(); HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 44a4633..95750c4 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -35,6 +35,8 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.EndReason; +import org.apache.tez.runtime.task.TaskRunner2Result; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult.ExitStatus; @@ -58,10 +60,10 @@ public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, } @Override - protected TezChild.ContainerExecutionResult callInternal() throws Exception { + protected TaskRunner2Result callInternal() throws Exception { System.out.println(requestId + " is executing.."); Thread.sleep(workTime); - return new ContainerExecutionResult(ExitStatus.SUCCESS, null, null); + return new TaskRunner2Result(EndReason.SUCCESS, null, false); } @Override