diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java new file mode 100644 index 0000000..8b481c8 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.records.TezTaskAttemptID; + +public interface KilledTaskHandler { + + // TODO Ideally, this should only need to send in the TaskAttemptId. Everything else should be + // inferred from this. + // Passing in parameters until there's some dag information stored and tracked in the daemon. + void taskKilled(String amLocation, int port, String user, + Token jobToken, TezTaskAttemptID taskAttemptId); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 3dc8fb8..39b3634 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -46,11 +46,12 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Sends status updates to various AMs. + * Responsible for communicating with various AMs. */ public class AMReporter extends AbstractService { @@ -80,6 +81,8 @@ private final DelayQueue pendingHeartbeatQueeu = new DelayQueue(); private final long heartbeatInterval; private final AtomicBoolean isShutdown = new AtomicBoolean(false); + // Tracks appMasters to which heartbeats are being sent. This should not be used for any other + // messages like taskKilled, etc. private final Map knownAppMasters = new HashMap<>(); volatile ListenableFuture queueLookupFuture; @@ -167,6 +170,28 @@ public void unregisterTask(String amLocation, int port) { } } + public void taskKilled(String amLocation, int port, String user, Token jobToken, + final TezTaskAttemptID taskAttemptId) { + // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. + // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. + LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); + AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + ListenableFuture future = + executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.info("Sent taskKilled for {}", taskAttemptId); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.", + taskAttemptId); + } + }); + } + private class QueueLookupCallable extends CallableWithNdc { @Override @@ -199,6 +224,31 @@ protected Void callInternal() { } } + private class KillTaskCallable extends CallableWithNdc { + final AMNodeInfo amNodeInfo; + final TezTaskAttemptID taskAttemptId; + + public KillTaskCallable(TezTaskAttemptID taskAttemptId, + AMNodeInfo amNodeInfo) { + this.taskAttemptId = taskAttemptId; + this.amNodeInfo = amNodeInfo; + } + + @Override + protected Void callInternal() { + try { + amNodeInfo.getUmbilical().taskKilled(taskAttemptId); + } catch (IOException e) { + LOG.warn("Failed to send taskKilled message for task {}. Will re-run after it times out", taskAttemptId); + } catch (InterruptedException e) { + if (!isShutdown.get()) { + LOG.info("Interrupted while trying to send taskKilled message for task {}", taskAttemptId); + } + } + return null; + } + } + private class AMHeartbeatCallable extends CallableWithNdc { final AMNodeInfo amNodeInfo; @@ -224,7 +274,7 @@ protected Void callInternal() { LOG.warn("Failed to communicate with AM. May retry later: " + amNodeInfo.amNodeId, e); } catch (InterruptedException e) { if (!isShutdown.get()) { - LOG.warn("Failed to communicate with AM: " + amNodeInfo.amNodeId, e); + LOG.warn("Interrupted while trying to send heartbeat to AM: " + amNodeInfo.amNodeId, e); } } } else { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index c9e5829..e544789 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; @@ -72,6 +73,7 @@ private final LlapDaemonExecutorMetrics metrics; private final Configuration conf; private final TaskRunnerCallable.ConfParams confParams; + private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); // Map of dagId to vertices and associated state. private final ConcurrentMap> sourceCompletionMap = new ConcurrentHashMap<>(); @@ -110,7 +112,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi ); LOG.info("ContainerRunnerImpl config: " + - "memoryPerExecutorDerviced=" + memoryPerExecutor + "memoryPerExecutorDerviced=" + memoryPerExecutor ); } @@ -193,7 +195,7 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { ConcurrentMap sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics); + credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics, killedTaskHandler); executorService.schedule(callable); metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); @@ -219,11 +221,6 @@ public void terminateFragment(TerminateFragmentRequestProto request) { // TODO Implement when this gets used. } - - private void notifyAMOfRejection(TaskRunnerCallable callable) { - LOG.error("Notifying AM of request rejection is not implemented yet!"); - } - private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("dagName=").append(request.getDagName()) @@ -294,4 +291,13 @@ public static String stringifySubmitRequest(SubmitWorkRequestProto request) { } return dagMap; } + + private class KilledTaskHandlerImpl implements KilledTaskHandler { + + @Override + public void taskKilled(String amLocation, int port, String user, + Token jobToken, TezTaskAttemptID taskAttemptId) { + amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId); + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 16d745b..5c8116e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -61,6 +61,8 @@ void registerFragment(String queryId, String appIdString, String dagName, int da queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user); queryInfoMap.putIfAbsent(dagName, queryInfo); } + // TODO Start tracking individual fragments, so that taskKilled etc messages + // can be routed through this layer to simplify the interfaces. } String[] getLocalDirs(String queryId, String dagName, String user) throws IOException { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 11ba793..42a3528 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.annotations.VisibleForTesting; @@ -40,6 +39,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Task executor service provides method for scheduling tasks. Tasks submitted to executor service @@ -61,7 +62,7 @@ * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService implements Scheduler { - private static final Logger LOG = Logger.getLogger(TaskExecutorService.class); + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final boolean isTraceEnabled = LOG.isTraceEnabled(); @@ -183,7 +184,7 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException } } - private boolean trySchedule(TaskRunnerCallable task) { + private boolean trySchedule(final TaskRunnerCallable task) { boolean scheduled = false; try { @@ -236,7 +237,9 @@ private boolean trySchedule(TaskRunnerCallable task) { LOG.debug("Pre-emption invoked for " + pRequest.getRequestId() + " by interrupting the thread."); } - pFuture.cancel(true); + pRequest.killTask(); + // TODO. Ideally, should wait for the thread to complete and fall off before assuming the + // slot is available for the next task. removeTaskFromPreemptionList(pRequest, pRequest.getRequestId()); // future is cancelled or completed normally, in which case schedule the new request @@ -244,8 +247,6 @@ private boolean trySchedule(TaskRunnerCallable task) { if (isDebugEnabled) { LOG.debug(pRequest.getRequestId() + " request preempted by " + task.getRequestId()); } - - notifyAM(pRequest); } } @@ -333,11 +334,6 @@ private void updatePreemptionListAndNotify(boolean success) { } - private void notifyAM(TaskRunnerCallable request) { - // TODO: Report to AM of pre-emption and rejection - LOG.info("Notifying to AM of preemption is not implemented yet!"); - } - // TODO: llap daemon should call this to gracefully shutdown the task executor service public void shutDown(boolean awaitTermination) { if (awaitTermination) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 7e7c133..b16a5c4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; @@ -56,6 +57,7 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.library.input.UnorderedKVInput; +import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.base.Stopwatch; @@ -88,6 +90,7 @@ private final AMReporter amReporter; private final ConcurrentMap sourceCompletionMap; private final TaskSpec taskSpec; + private final KilledTaskHandler killedTaskHandler; private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; @@ -96,13 +99,17 @@ private volatile String threadName; private LlapDaemonExecutorMetrics metrics; protected String requestId; + private boolean shouldRunTask = true; + final Stopwatch runtimeWatch = new Stopwatch(); + final Stopwatch killtimerWatch = new Stopwatch(); TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map envMap, String[] localDirs, Credentials credentials, long memoryAvailable, AMReporter amReporter, ConcurrentMap sourceCompletionMap, - ConfParams confParams, LlapDaemonExecutorMetrics metrics) { + ConfParams confParams, LlapDaemonExecutorMetrics metrics, + KilledTaskHandler killedTaskHandler) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -123,6 +130,7 @@ } this.metrics = metrics; this.requestId = getTaskAttemptId(request); + this.killedTaskHandler = killedTaskHandler; } @Override @@ -146,7 +154,7 @@ protected TaskRunner2Result callInternal() throws Exception { executor = MoreExecutors.listeningDecorator(executorReal); // TODO Consolidate this code with TezChild. - Stopwatch sw = new Stopwatch().start(); + runtimeWatch.start(); UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); taskUgi.addCredentials(credentials); @@ -177,12 +185,21 @@ public LlapTaskUmbilicalProtocol run() throws Exception { new AtomicLong(0), request.getContainerIdString()); - taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, - taskSpec, - request.getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, - pid, - executionContext, memoryAvailable); + synchronized (this) { + if (shouldRunTask) { + taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, + taskSpec, + request.getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, + objectRegistry, + pid, + executionContext, memoryAvailable); + } + } + if (taskRunner == null) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } try { TaskRunner2Result result = taskRunner.run(); @@ -195,7 +212,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. // FileSystem.closeAllForUGI(taskUgi); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - sw.stop().elapsed(TimeUnit.MILLISECONDS)); + runtimeWatch.stop().elapsed(TimeUnit.MILLISECONDS)); if (LOG.isDebugEnabled()) { LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); } @@ -203,6 +220,35 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } /** + * Attempt to kill a running task. If the task has not started running, it will not start. + * If it's already running, a kill request will be sent to it. + * + * The AM will be informed about the task kill. + */ + public void killTask() { + synchronized (this) { + LOG.info("Killing task with id {}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), (taskRunner != null)); + if (taskRunner != null) { + killtimerWatch.start(); + LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID()); + taskRunner.killTask(); + shouldRunTask = false; + } + } + // Sending a kill message to the AM right here. Don't need to wait for the task to complete. + reportTaskKilled(); + } + + /** + * Inform the AM that this task has been killed. + */ + public void reportTaskKilled() { + killedTaskHandler + .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + taskSpec.getTaskAttemptID()); + } + + /** * Check whether a task can run to completion or may end up blocking on it's sources. * This currently happens via looking up source state. * TODO: Eventually, this should lookup the Hive Processor to figure out whether @@ -314,7 +360,8 @@ public String getRequestId() { return requestId; } - // TODO Slightly more useful error handling + // Errors are handled on the way over. FAIL/SUCCESS is informed via regular heartbeats. Killed + // via a kill message when a task kill is requested by the daemon. @Override public void onSuccess(TaskRunner2Result result) { switch(result.getEndReason()) { @@ -327,7 +374,14 @@ public void onSuccess(TaskRunner2Result result) { LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); break; case KILL_REQUESTED: - // TODO Send a kill out to the AM. + LOG.info("Killed task {}", requestId); + if (killtimerWatch.isRunning()) { + killtimerWatch.stop(); + long elapsed = killtimerWatch.elapsed(TimeUnit.MILLISECONDS); + LOG.info("Time to die for task {}", elapsed); + } + metrics.incrPreemptionTimeLost(runtimeWatch.elapsed(TimeUnit.MILLISECONDS)); + metrics.incrExecutorTotalKilled(); break; case COMMUNICATION_FAILURE: LOG.info("Failed to run {} due to communication failure", requestId); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index d7bed53..e4739dc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -33,7 +33,8 @@ ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), - ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"); + ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"), + PreemptionTimeLost("Total time lost due to task preemptions"); private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 22c9fe0..33b8f9d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -72,11 +73,12 @@ @Metric MutableCounterLong executorTotalSuccess; @Metric - MutableCounterLong executorTotalInterrupted; + MutableCounterLong executorTotalIKilled; @Metric MutableCounterLong executorTotalExecutionFailed; @Metric - MutableCounterLong executorTotalAskedToDie; + MutableCounterLong preemptionTimeLost; + private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, int numExecutors) { @@ -141,14 +143,15 @@ public void incrExecutorTotalExecutionFailed() { executorTotalExecutionFailed.incr(); } - public void incrExecutorTotalInterrupted() { - executorTotalInterrupted.incr(); + public void incrPreemptionTimeLost(long value) { + preemptionTimeLost.incr(value); } - public void incrExecutorTotalAskedToDie() { - executorTotalAskedToDie.incr(); + public void incrExecutorTotalKilled() { + executorTotalIKilled.incr(); } + private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); @@ -156,8 +159,8 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) - .addCounter(ExecutorTotalInterrupted, executorTotalInterrupted.value()) - .addCounter(ExecutorTotalAskedToDie, executorTotalAskedToDie.value()); + .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value()) + .addCounter(PreemptionTimeLost, preemptionTimeLost.value()); } private void updateThreadMetrics(MetricsRecordBuilder rb) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java index 886194a..2f5e11d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java @@ -34,4 +34,6 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) public void nodeHeartbeat(Text hostname, int port); + public void taskKilled(TezTaskAttemptID taskAttemptId); + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 99459e4..d614548 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -278,9 +278,9 @@ public void indicateError(Throwable t) { public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { super.unregisterRunningTaskAttempt(taskAttemptID); entityTracker.unregisterTaskAttempt(taskAttemptID); - // TODO Inform the daemon that this task is no longer running. - // Currently, a task will end up moving into the RUNNING queue and will - // be told that it needs to die since it isn't recognized. + // This will also be invoked for tasks which have been KILLED / rejected by the daemon. + // Informing the daemon becomes necessary once the LlapScheduler supports preemption + // and/or starts attempting to kill tasks which may be running on a node. } @Override @@ -407,6 +407,14 @@ public void nodeHeartbeat(Text hostname, int port) { } @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) { + // TODO Unregister the task for state updates, which could in turn unregister the node. + getTaskCommunicatorContext().taskKilled(taskAttemptId, + TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted"); + entityTracker.unregisterTaskAttempt(taskAttemptId); + } + + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return versionID; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 95750c4..3b89b48 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; @@ -54,7 +56,7 @@ public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, boolean canFinish, int workTime) { super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null, - null, null, null); + null, null, null, mock(KilledTaskHandler.class)); this.workTime = workTime; this.canFinish = canFinish; }