diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 7fff147..cb38839 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -58,6 +58,7 @@ protected final long timeout; protected final Closeable client; private final Closeable socket; + private boolean closed = false; public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client, Closeable socket) { @@ -78,27 +79,31 @@ public Schema getSchema() { } @Override - public void close() throws IOException { - Exception caughtException = null; - try { - din.close(); - } catch (Exception err) { - LOG.error("Error closing input stream:" + err.getMessage(), err); - caughtException = err; - } - // Don't close the socket - the stream already does that if needed. + public synchronized void close() throws IOException { + if (!closed) { + closed = true; - if (client != null) { + Exception caughtException = null; try { - client.close(); + din.close(); } catch (Exception err) { - LOG.error("Error closing client:" + err.getMessage(), err); - caughtException = (caughtException == null ? err : caughtException); + LOG.error("Error closing input stream:" + err.getMessage(), err); + caughtException = err; + } + // Don't close the socket - the stream already does that if needed. + + if (client != null) { + try { + client.close(); + } catch (Exception err) { + LOG.error("Error closing client:" + err.getMessage(), err); + caughtException = (caughtException == null ? err : caughtException); + } } - } - if (caughtException != null) { - throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + if (caughtException != null) { + throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + } } } @@ -156,28 +161,40 @@ public boolean next(NullWritable key, V value) throws IOException { return false; } } catch (IOException io) { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage(), io); - default: - throw new IOException("Got reader event type " + event.getEventType() - + ", expected error event", io); + try { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. + throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage(), io); + default: + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); + } } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } finally { + // The external client handling umbilical responses and the connection to read the incoming + // data are not coupled. Calling close() here to make sure an error in one will cause the + // other to be closed as well. + try { + close(); + } catch (Exception err) { + // Don't propagate errors from close() since this will lose the original error above. + LOG.error("Closing RecordReader due to error and hit another error during close()", err); } - } else { - // If we weren't interrupted, just propagate the error - throw io; } } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 406bdda..dde6847 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -59,7 +59,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -71,24 +70,52 @@ import org.slf4j.LoggerFactory; -public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable { +public class LlapTaskUmbilicalExternalClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); private final LlapProtocolClientProxy communicator; private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; private final Configuration conf; - private final LlapTaskUmbilicalProtocol umbilical; protected final String tokenIdentifier; protected final Token sessionToken; - - private final ConcurrentMap pendingEvents = new ConcurrentHashMap<>(); - private final ConcurrentMap registeredTasks= new ConcurrentHashMap(); private LlapTaskUmbilicalExternalResponder responder = null; - private final ScheduledThreadPoolExecutor timer; private final long connectionTimeout; private volatile boolean closed = false; + private TaskHeartbeatInfo heartbeatInfo; + List tezEvents; + + // Using a shared instance of the umbilical server. + private static class SharedUmbilicalServer { + LlapTaskUmbilicalExternalImpl umbilicalProtocol; + LlapTaskUmbilicalServer llapTaskUmbilicalServer; + + private volatile static SharedUmbilicalServer instance; + private static final Object lock = new Object(); + + static SharedUmbilicalServer getInstance(Configuration conf) { + SharedUmbilicalServer value = instance; + if (value == null) { + synchronized (lock) { + if (instance == null) { + instance = new SharedUmbilicalServer(conf); + value = instance; + } + } + } + return value; + } + + private SharedUmbilicalServer(Configuration conf) { + try { + umbilicalProtocol = new LlapTaskUmbilicalExternalImpl(conf); + llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilicalProtocol, 1); + } catch (Exception err) { + throw new ExceptionInInitializerError(err); + } + } + } private static class TaskHeartbeatInfo { final QueryIdentifierProto queryIdentifierProto; @@ -107,26 +134,13 @@ public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskA } } - private static class PendingEventData { - final TaskHeartbeatInfo heartbeatInfo; - final List tezEvents; - - public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List tezEvents) { - this.heartbeatInfo = heartbeatInfo; - this.tezEvents = tezEvents; - } - } - public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token sessionToken, LlapTaskUmbilicalExternalResponder responder, Token llapToken) { - super(LlapTaskUmbilicalExternalClient.class.getName()); this.conf = conf; - this.umbilical = new LlapTaskUmbilicalExternalImpl(); this.tokenIdentifier = tokenIdentifier; this.sessionToken = sessionToken; this.responder = responder; - this.timer = new ScheduledThreadPoolExecutor(1); this.connectionTimeout = 3 * HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); // Add support for configurable threads, however 1 should always be enough. @@ -134,37 +148,18 @@ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifie this.communicator.init(conf); } - @Override - public void serviceStart() throws IOException { - // If we use a single server for multiple external clients, then consider using more than one handler. - int numHandlers = 1; - llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); - communicator.start(); - } - - @Override - public void serviceStop() throws Exception { - if (closed) { - throw new IllegalStateException("Client has already been closed"); + private void terminateRequest() { + if (closed || heartbeatInfo == null) { + LOG.warn("No current request to terminate"); + return; } - closed = true; - // Check if the request is registered - if so we can cancel the request - for (Map.Entry taskEntry : registeredTasks.entrySet()) { - terminateRequest(taskEntry.getValue()); - } - registeredTasks.clear(); - - scheduleClientForCleanup(this); - } - - private void terminateRequest(TaskHeartbeatInfo thi) { TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder(); - builder.setQueryIdentifier(thi.queryIdentifierProto); - builder.setFragmentIdentifierString(thi.taskAttemptId); + builder.setQueryIdentifier(heartbeatInfo.queryIdentifierProto); + builder.setFragmentIdentifierString(heartbeatInfo.taskAttemptId); - final String taskAttemptId = thi.taskAttemptId; - communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port, + final String taskAttemptId = heartbeatInfo.taskAttemptId; + communicator.sendTerminateFragment(builder.build(), heartbeatInfo.hostname, heartbeatInfo.port, new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override @@ -181,16 +176,8 @@ public void indicateError(Throwable t) { }); } - private void doShutdown() throws IOException { - llapTaskUmbilicalServer.shutdownServer(); - timer.shutdown(); - if (this.communicator != null) { - this.communicator.stop(); - } - } - public InetSocketAddress getAddress() { - return llapTaskUmbilicalServer.getAddress(); + return SharedUmbilicalServer.getInstance(conf).llapTaskUmbilicalServer.getAddress(); } @@ -213,151 +200,124 @@ public void submitWork(SubmitWorkRequestProto request, String llapHost, int llap vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()); final String fragmentId = attemptId.toString(); - final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort); - pendingEvents.putIfAbsent( - fragmentId, new PendingEventData(thi, Lists.newArrayList())); + this.heartbeatInfo = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort); - // Setup timer task to check for hearbeat timeouts - timer.scheduleAtFixedRate(new HeartbeatCheckTask(), - connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); + this.tezEvents = Lists.newArrayList(); + SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf); + umbilicalServer.umbilicalProtocol.pendingClients.putIfAbsent(fragmentId, this); + umbilicalServer.llapTaskUmbilicalServer.addTokenForJob(tokenIdentifier, sessionToken); // Send out the actual SubmitWorkRequest - communicator.sendSubmitWork(request, llapHost, llapPort, - new LlapProtocolClientProxy.ExecuteRequestCallback() { - - @Override - public void setResponse(SubmitWorkResponseProto response) { - if (response.hasSubmissionState()) { - if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) { - String msg = "Fragment: " + fragmentId + " rejected. Server Busy."; - LOG.info(msg); - if (responder != null) { - Throwable err = new RuntimeException(msg); - responder.submissionFailed(fragmentId, err); - } - return; - } - } - if (response.hasUniqueNodeId()) { - thi.uniqueNodeId = response.getUniqueNodeId(); - } - } - - @Override - public void indicateError(Throwable t) { - String msg = "Failed to submit: " + fragmentId; - LOG.error(msg, t); - Throwable err = new RuntimeException(msg, t); - responder.submissionFailed(fragmentId, err); - } - }); + final LlapTaskUmbilicalExternalClient client = this; + communicator.start(); + WorkSubmitter submitter = new WorkSubmitter(this, request, llapHost, llapPort, fragmentId); + submitter.submitWork(); } - private void updateHeartbeatInfo(String taskAttemptId) { - int updateCount = 0; + // Helper class to submit fragments to LLAP and retry rejected submissions. + static class WorkSubmitter implements LlapProtocolClientProxy.ExecuteRequestCallback { + private LlapTaskUmbilicalExternalClient client; + private SubmitWorkRequestProto request; + private String llapHost; + private int llapPort; + private String fragmentId; - PendingEventData pendingEventData = pendingEvents.get(taskAttemptId); - if (pendingEventData != null) { - pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId); - if (heartbeatInfo != null) { - heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - if (updateCount == 0) { - LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + public WorkSubmitter(LlapTaskUmbilicalExternalClient client, + SubmitWorkRequestProto request, String llapHost, int llapPort, String fragmentId) { + this.client = client; + this.request = request; + this.llapHost = llapHost; + this.llapPort = llapPort; + this.fragmentId = fragmentId; } - } - private void updateHeartbeatInfo( - String hostname, String uniqueId, int port, TezAttemptArray tasks) { - int updateCount = 0; - HashSet attempts = new HashSet<>(); - for (Writable w : tasks.get()) { - attempts.add((TezTaskAttemptID)w); + public void submitWork() { + client.communicator.sendSubmitWork(request, llapHost, llapPort, this); } - String error = ""; - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo; - String thiUniqueId = thi.uniqueNodeId; - if (thi.hostname.equals(hostname) && thi.port == port - && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) { - TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId); - if (attempts.contains(ta)) { - thi.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } else { - error += (thi.taskAttemptId + ", "); + @Override + public void setResponse(SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) { + String msg = "Fragment: " + fragmentId + " rejected. Server Busy."; + LOG.info(msg); + + // Retry rejected requests + if (!client.closed) { + LOG.info("Resubmitting fragment " + fragmentId); + submitWork(); } + return; } } + if (response.hasUniqueNodeId()) { + client.heartbeatInfo.uniqueNodeId = response.getUniqueNodeId(); + } } - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo thi = registeredTasks.get(key); - if (thi != null) { - String thiUniqueId = thi.uniqueNodeId; - if (thi.hostname.equals(hostname) && thi.port == port - && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) { - TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId); - if (attempts.contains(ta)) { - thi.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } else { - error += (thi.taskAttemptId + ", "); - } - } - } + @Override + public void indicateError(Throwable t) { + String msg = "Failed to submit: " + fragmentId; + LOG.error(msg, t); + Throwable err = new RuntimeException(msg, t); + client.unregisterClient(); + client.responder.submissionFailed(fragmentId, err); } - if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); + } + + @Override + public void close() { + if (!closed) { + terminateRequest(); + unregisterClient(); } + } - if (updateCount == 0) { - LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + private void unregisterClient() { + if (!closed && heartbeatInfo != null) { + communicator.stop(); + SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf); + umbilicalServer.umbilicalProtocol.unregisterClient(heartbeatInfo.taskAttemptId); + umbilicalServer.llapTaskUmbilicalServer.removeTokenForJob(tokenIdentifier); + closed = true; } } - private class HeartbeatCheckTask implements Runnable { + long getLastHeartbeat() { + return this.heartbeatInfo.lastHeartbeat.get(); + } + + void setLastHeartbeat(long lastHeartbeat) { + this.heartbeatInfo.lastHeartbeat.set(lastHeartbeat); + } + + // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat. + private static class HeartbeatCheckTask implements Runnable { + LlapTaskUmbilicalExternalImpl umbilicalImpl; + + public HeartbeatCheckTask(LlapTaskUmbilicalExternalImpl umbilicalImpl) { + this.umbilicalImpl = umbilicalImpl; + } + public void run() { long currentTime = System.currentTimeMillis(); - List timedOutTasks = new ArrayList(); - - // Check both pending and registered tasks for timeouts - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); + List timedOutTasks = new ArrayList(); + + // Only time out registered tasks that are running. + // Allow pending tasks to either get accepted or rejected by LLAP. + for (String key : umbilicalImpl.registeredClients.keySet()) { + LlapTaskUmbilicalExternalClient client = umbilicalImpl.registeredClients.get(key); + if (client != null) { + if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) { + timedOutTasks.add(client); } } } - for (String timedOutTask : timedOutTasks) { - LOG.info("Pending taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - pendingEvents.remove(timedOutTask); - } - - timedOutTasks.clear(); - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); - if (heartbeatInfo != null) { - if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); - } - } - } - for (String timedOutTask : timedOutTasks) { - LOG.info("Running taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - registeredTasks.remove(timedOutTask); + for (LlapTaskUmbilicalExternalClient timedOutTask : timedOutTasks) { + String taskAttemptId = timedOutTask.heartbeatInfo.taskAttemptId; + LOG.info("Running taskAttemptId " + taskAttemptId + " timed out"); + timedOutTask.unregisterClient(); + timedOutTask.responder.heartbeatTimeout(taskAttemptId); } } } @@ -369,10 +329,20 @@ public void run() { void heartbeatTimeout(String fragmentId); } + private static class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + final ConcurrentMap pendingClients = new ConcurrentHashMap<>(); + final ConcurrentMap registeredClients = new ConcurrentHashMap<>(); + private final ScheduledThreadPoolExecutor timer; - // Ideally, the server should be shared across all client sessions running on the same node. - private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + public LlapTaskUmbilicalExternalImpl(Configuration conf) { + long taskInterval = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + // Setup timer task to check for hearbeat timeouts + this.timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new HeartbeatCheckTask(this), + taskInterval, taskInterval, TimeUnit.MILLISECONDS); + } @Override public boolean canCommit(TezTaskAttemptID taskid) throws IOException { @@ -399,33 +369,28 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); String taskAttemptIdString = taskAttemptId.toString(); - - if (closed) { - LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString); - // Set shouldDie response so the LLAP daemon closes this umbilical connection. - response.setShouldDie(); - return response; - } - updateHeartbeatInfo(taskAttemptIdString); List tezEvents = null; - PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString); - if (pendingEventData == null) { + LlapTaskUmbilicalExternalClient client = pendingClients.remove(taskAttemptIdString); + if (client == null) { tezEvents = Collections.emptyList(); - // If this heartbeat was not from a pending event and it's not in our list of registered tasks, - if (!registeredTasks.containsKey(taskAttemptIdString)) { + client = registeredClients.get(taskAttemptIdString); + if (client == null) { + // Heartbeat is from a task that we are not currently tracking. LOG.info("Unexpected heartbeat from " + taskAttemptIdString); response.setShouldDie(); // Do any of the other fields need to be set? return response; } } else { - tezEvents = pendingEventData.tezEvents; + tezEvents = client.tezEvents; // Tasks removed from the pending list should then be added to the registered list. - registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo); + registeredClients.put(taskAttemptIdString, client); } + boolean shouldUnregisterClient = false; + response.setLastRequestId(request.getRequestId()); // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. // Also since we have all the MRInput events here - they'll all be sent in together. @@ -443,11 +408,11 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce switch (eventType) { case TASK_ATTEMPT_COMPLETED_EVENT: LOG.debug("Task completed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); + shouldUnregisterClient = true; break; case TASK_ATTEMPT_FAILED_EVENT: LOG.debug("Task failed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); + shouldUnregisterClient = true; break; case TASK_STATUS_UPDATE_EVENT: // If we want to handle counters @@ -459,10 +424,14 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce } } + if (shouldUnregisterClient) { + client.unregisterClient(); + } + // Pass the request on to the responder try { - if (responder != null) { - responder.heartbeat(request); + if (client.responder != null) { + client.responder.heartbeat(request); } } catch (Exception err) { LOG.error("Error during responder execution", err); @@ -482,14 +451,18 @@ public void nodeHeartbeat( public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { String taskAttemptIdString = taskAttemptId.toString(); LOG.error("Task killed - " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); - - try { - if (responder != null) { - responder.taskKilled(taskAttemptId); + LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString); + if (client != null) { + try { + client.unregisterClient(); + if (client.responder != null) { + client.responder.taskKilled(taskAttemptId); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); } - } catch (Exception err) { - LOG.error("Error during responder execution", err); + } else { + LOG.info("Received task killed notification for task which is not currently being tracked: " + taskAttemptId); } } @@ -504,38 +477,85 @@ public ProtocolSignature getProtocolSignature(String protocol, long clientVersio return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } - } - private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) { - // Add a bit of delay in case the daemon has not closed the umbilical connection yet. - clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS); - } + private void unregisterClient(String taskAttemptId) { + pendingClients.remove(taskAttemptId); + registeredClients.remove(taskAttemptId); + } - static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1); - static final int cleanupDelay = 2000; + private void updateHeartbeatInfo(String taskAttemptId) { + int updateCount = 0; - static class ClientCleanupTask implements Runnable { - final LlapTaskUmbilicalExternalClient client; + LlapTaskUmbilicalExternalClient pendingClient = pendingClients.get(taskAttemptId); + if (pendingClient != null) { + pendingClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } - public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) { - this.client = client; + LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(taskAttemptId); + if (registeredClient != null) { + registeredClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } + + if (updateCount == 0) { + LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + } } - @Override - public void run() { - if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) { - // No more outstanding connections, ok to close. - try { - LOG.debug("Closing client"); - client.doShutdown(); - } catch (Exception err) { - LOG.error("Error cleaning up client", err); + private void updateHeartbeatInfo( + String hostname, String uniqueId, int port, TezAttemptArray tasks) { + int updateCount = 0; + HashSet attempts = new HashSet<>(); + for (Writable w : tasks.get()) { + attempts.add((TezTaskAttemptID)w); + } + + String error = ""; + for (String key : pendingClients.keySet()) { + LlapTaskUmbilicalExternalClient pendingClient = pendingClients.get(key); + if (pendingClient != null) { + TaskHeartbeatInfo thi = pendingClient.heartbeatInfo; + if(doesClientMatchHeartbeat(pendingClient, hostname, uniqueId, port)) { + TezTaskAttemptID ta = TezTaskAttemptID.fromString(pendingClient.heartbeatInfo.taskAttemptId); + if (attempts.contains(ta)) { + pendingClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } else { + error += (pendingClient.heartbeatInfo.taskAttemptId + ", "); + } + } } - } else { - // Reschedule this task for later. - LOG.debug("Client still has umbilical connection - rescheduling cleanup."); - scheduleClientForCleanup(client); } + + for (String key : registeredClients.keySet()) { + LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(key); + if (registeredClient != null) { + if (doesClientMatchHeartbeat(registeredClient, hostname, uniqueId, port)) { + TezTaskAttemptID ta = TezTaskAttemptID.fromString(registeredClient.heartbeatInfo.taskAttemptId); + if (attempts.contains(ta)) { + registeredClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } else { + error += (registeredClient.heartbeatInfo.taskAttemptId + ", "); + } + } + } + } + if (!error.isEmpty()) { + LOG.info("The tasks we expected to be on the node are not there: " + error); + } + + if (updateCount == 0) { + LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + } + } + + private static boolean doesClientMatchHeartbeat(LlapTaskUmbilicalExternalClient client, + String hostname, String uniqueId, int port) { + return (hostname.equals(client.heartbeatInfo.hostname) + && port == client.heartbeatInfo.port + && uniqueId.equals(client.heartbeatInfo.uniqueNodeId)); } } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java index 403381d..db0fef1 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -18,10 +18,15 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -32,6 +37,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.api.impl.TezEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +48,11 @@ protected volatile Server server; private final InetSocketAddress address; private final AtomicBoolean started = new AtomicBoolean(true); + JobTokenSecretManager jobTokenSecretManager; + Map tokenRefMap = new HashMap(); - public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token token) throws - IOException { - JobTokenSecretManager jobTokenSecretManager = - new JobTokenSecretManager(); - jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException { + jobTokenSecretManager = new JobTokenSecretManager(); server = new RPC.Builder(conf) .setProtocol(LlapTaskUmbilicalProtocol.class) @@ -65,7 +70,7 @@ public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umb this.address = NetUtils.getConnectAddress(server); LOG.info( "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address + - " with numHandlers=" + numHandlers); + " with numHandlers=" + numHandlers); } public InetSocketAddress getAddress() { @@ -76,6 +81,33 @@ public int getNumOpenConnections() { return server.getNumOpenConnections(); } + public synchronized void addTokenForJob(String tokenIdentifier, Token token) { + // Maintain count of outstanding requests for tokenIdentifier. + int[] refCount = tokenRefMap.get(tokenIdentifier); + if (refCount == null) { + refCount = new int[] { 0 }; + tokenRefMap.put(tokenIdentifier, refCount); + // Should only need to insert the token the first time. + jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + } + refCount[0]++; + } + + public synchronized void removeTokenForJob(String tokenIdentifier) { + // Maintain count of outstanding requests for tokenIdentifier. + // If count goes to 0, it is safe to remove the token. + int[] refCount = tokenRefMap.get(tokenIdentifier); + if (refCount == null) { + LOG.warn("No refCount found for tokenIdentifier " + tokenIdentifier); + } else { + refCount[0]--; + if (refCount[0] <= 0) { + tokenRefMap.remove(tokenIdentifier); + jobTokenSecretManager.removeTokenForJob(tokenIdentifier); + } + } + } + public void shutdownServer() { if (started.get()) { // Primarily to avoid multiple shutdowns. started.set(false); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index eb93241..7394011 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -147,8 +147,8 @@ public LlapBaseInputFormat() {} LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), umbilicalResponder, llapToken); - llapClient.init(job); - llapClient.start(); +// llapClient.init(job); +// llapClient.start(); int attemptNum = 0; // Use task attempt number from conf if provided