diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 7d0d6d2..c7de417 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -47,6 +48,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; @@ -85,15 +88,18 @@ private LlapTaskUmbilicalExternalResponder responder = null; private final ScheduledThreadPoolExecutor timer; private final long connectionTimeout; + private volatile boolean closed = false; private static class TaskHeartbeatInfo { + final QueryIdentifierProto queryIdentifierProto; final String taskAttemptId; final String hostname; String uniqueNodeId; final int port; final AtomicLong lastHeartbeat = new AtomicLong(); - public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) { + public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) { + this.queryIdentifierProto = queryIdentifierProto; this.taskAttemptId = taskAttemptId; this.hostname = hostname; this.port = port; @@ -137,7 +143,45 @@ public void serviceStart() throws IOException { } @Override - public void serviceStop() { + public void serviceStop() throws Exception { + if (closed) { + throw new IllegalStateException("Client has already been closed"); + } + closed = true; + + // Check if the request is registered - if so we can cancel the request + for (Map.Entry taskEntry : registeredTasks.entrySet()) { + terminateRequest(taskEntry.getValue()); + } + registeredTasks.clear(); + + scheduleClientForCleanup(this); + } + + private void terminateRequest(TaskHeartbeatInfo thi) { + TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder(); + builder.setQueryIdentifier(thi.queryIdentifierProto); + builder.setFragmentIdentifierString(thi.taskAttemptId); + + final String taskAttemptId = thi.taskAttemptId; + communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port, + new LlapProtocolClientProxy.ExecuteRequestCallback() { + + @Override + public void setResponse(TerminateFragmentResponseProto response) { + LOG.debug("Received terminate response for " + taskAttemptId); + } + + @Override + public void indicateError(Throwable t) { + String msg = "Failed to terminate " + taskAttemptId; + LOG.error(msg, t); + // Don't propagate the error - termination was done as part of closing the client. + } + }); + } + + private void doShutdown() throws IOException { llapTaskUmbilicalServer.shutdownServer(); timer.shutdown(); if (this.communicator != null) { @@ -170,7 +214,7 @@ public void submitWork(SubmitWorkRequestProto request, String llapHost, int llap vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()); final String fragmentId = attemptId.toString(); - final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort); + final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort); pendingEvents.putIfAbsent( fragmentId, new PendingEventData(thi, Lists.newArrayList())); @@ -357,6 +401,13 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); String taskAttemptIdString = taskAttemptId.toString(); + if (closed) { + LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString); + // Set shouldDie response so the LLAP daemon closes this umbilical connection. + response.setShouldDie(); + return response; + } + updateHeartbeatInfo(taskAttemptIdString); List tezEvents = null; @@ -456,4 +507,36 @@ public ProtocolSignature getProtocolSignature(String protocol, long clientVersio } } + private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) { + // Add a bit of delay in case the daemon has not closed the umbilical connection yet. + clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS); + } + + static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1); + static final int cleanupDelay = 2000; + + static class ClientCleanupTask implements Runnable { + final LlapTaskUmbilicalExternalClient client; + + public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) { + this.client = client; + } + + @Override + public void run() { + if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) { + // No more outstanding connections, ok to close. + try { + LOG.debug("Closing client"); + client.doShutdown(); + } catch (Exception err) { + LOG.error("Error cleaning up client", err); + } + } else { + // Reschedule this task for later. + LOG.debug("Client still has umbilical connection - rescheduling cleanup."); + scheduleClientForCleanup(client); + } + } + } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java index 470ee6d..403381d 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -72,6 +72,10 @@ public InetSocketAddress getAddress() { return this.address; } + public int getNumOpenConnections() { + return server.getNumOpenConnections(); + } + public void shutdownServer() { if (started.get()) { // Primarily to avoid multiple shutdowns. started.set(false);