diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 7fff147..cb38839 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -58,6 +58,7 @@ protected final long timeout; protected final Closeable client; private final Closeable socket; + private boolean closed = false; public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client, Closeable socket) { @@ -78,27 +79,31 @@ public Schema getSchema() { } @Override - public void close() throws IOException { - Exception caughtException = null; - try { - din.close(); - } catch (Exception err) { - LOG.error("Error closing input stream:" + err.getMessage(), err); - caughtException = err; - } - // Don't close the socket - the stream already does that if needed. + public synchronized void close() throws IOException { + if (!closed) { + closed = true; - if (client != null) { + Exception caughtException = null; try { - client.close(); + din.close(); } catch (Exception err) { - LOG.error("Error closing client:" + err.getMessage(), err); - caughtException = (caughtException == null ? err : caughtException); + LOG.error("Error closing input stream:" + err.getMessage(), err); + caughtException = err; + } + // Don't close the socket - the stream already does that if needed. + + if (client != null) { + try { + client.close(); + } catch (Exception err) { + LOG.error("Error closing client:" + err.getMessage(), err); + caughtException = (caughtException == null ? err : caughtException); + } } - } - if (caughtException != null) { - throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + if (caughtException != null) { + throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + } } } @@ -156,28 +161,40 @@ public boolean next(NullWritable key, V value) throws IOException { return false; } } catch (IOException io) { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage(), io); - default: - throw new IOException("Got reader event type " + event.getEventType() - + ", expected error event", io); + try { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. + throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage(), io); + default: + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); + } } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } finally { + // The external client handling umbilical responses and the connection to read the incoming + // data are not coupled. Calling close() here to make sure an error in one will cause the + // other to be closed as well. + try { + close(); + } catch (Exception err) { + // Don't propagate errors from close() since this will lose the original error above. + LOG.error("Closing RecordReader due to error and hit another error during close()", err); } - } else { - // If we weren't interrupted, just propagate the error - throw io; } } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java index 95b0ffc..3ae37dc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java @@ -41,9 +41,10 @@ private int vertexParallelism; public SubmitWorkInfo(ApplicationId fakeAppId, long creationTime, - int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature) { + int vertexParallelism, byte[] vertexSpec, byte[] vertexSpecSignature, + Token token) { this.fakeAppId = fakeAppId; - this.token = createJobToken(); + this.token = token; this.creationTime = creationTime; this.vertexSpec = vertexSpec; this.vertexSpecSignature = vertexSpecSignature; @@ -126,17 +127,6 @@ public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOExce return submitWorkInfo; } - - private Token createJobToken() { - String tokenIdentifier = fakeAppId.toString(); - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( - tokenIdentifier)); - Token sessionToken = new Token(identifier, - new JobTokenSecretManager()); - sessionToken.setService(identifier.getJobId()); - return sessionToken; - } - public byte[] getVertexBinary() { return vertexSpec; } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 406bdda..0c1f19a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -59,7 +59,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -71,26 +70,57 @@ import org.slf4j.LoggerFactory; -public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable { +public class LlapTaskUmbilicalExternalClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); + private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1); + private final LlapProtocolClientProxy communicator; private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; private final Configuration conf; - private final LlapTaskUmbilicalProtocol umbilical; protected final String tokenIdentifier; protected final Token sessionToken; - - private final ConcurrentMap pendingEvents = new ConcurrentHashMap<>(); - private final ConcurrentMap registeredTasks= new ConcurrentHashMap(); private LlapTaskUmbilicalExternalResponder responder = null; - private final ScheduledThreadPoolExecutor timer; private final long connectionTimeout; private volatile boolean closed = false; + private RequestInfo requestInfo; + List tezEvents; + + // Using a shared instance of the umbilical server. + private static class SharedUmbilicalServer { + LlapTaskUmbilicalExternalImpl umbilicalProtocol; + LlapTaskUmbilicalServer llapTaskUmbilicalServer; + + private volatile static SharedUmbilicalServer instance; + private static final Object lock = new Object(); + + static SharedUmbilicalServer getInstance(Configuration conf) { + SharedUmbilicalServer value = instance; + if (value == null) { + synchronized (lock) { + if (instance == null) { + instance = new SharedUmbilicalServer(conf); + } + value = instance; + } + } + return value; + } - private static class TaskHeartbeatInfo { + private SharedUmbilicalServer(Configuration conf) { + try { + umbilicalProtocol = new LlapTaskUmbilicalExternalImpl(conf); + llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilicalProtocol, 1); + } catch (Exception err) { + throw new ExceptionInInitializerError(err); + } + } + } + + private static class RequestInfo { + final SubmitWorkRequestProto request; final QueryIdentifierProto queryIdentifierProto; final String taskAttemptId; final String hostname; @@ -98,7 +128,9 @@ final int port; final AtomicLong lastHeartbeat = new AtomicLong(); - public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) { + public RequestInfo(SubmitWorkRequestProto request, QueryIdentifierProto queryIdentifierProto, + String taskAttemptId, String hostname, int port) { + this.request = request; this.queryIdentifierProto = queryIdentifierProto; this.taskAttemptId = taskAttemptId; this.hostname = hostname; @@ -107,26 +139,13 @@ public TaskHeartbeatInfo(QueryIdentifierProto queryIdentifierProto, String taskA } } - private static class PendingEventData { - final TaskHeartbeatInfo heartbeatInfo; - final List tezEvents; - - public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List tezEvents) { - this.heartbeatInfo = heartbeatInfo; - this.tezEvents = tezEvents; - } - } - public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token sessionToken, LlapTaskUmbilicalExternalResponder responder, Token llapToken) { - super(LlapTaskUmbilicalExternalClient.class.getName()); this.conf = conf; - this.umbilical = new LlapTaskUmbilicalExternalImpl(); this.tokenIdentifier = tokenIdentifier; this.sessionToken = sessionToken; this.responder = responder; - this.timer = new ScheduledThreadPoolExecutor(1); this.connectionTimeout = 3 * HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); // Add support for configurable threads, however 1 should always be enough. @@ -134,37 +153,18 @@ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifie this.communicator.init(conf); } - @Override - public void serviceStart() throws IOException { - // If we use a single server for multiple external clients, then consider using more than one handler. - int numHandlers = 1; - llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); - communicator.start(); - } - - @Override - public void serviceStop() throws Exception { - if (closed) { - throw new IllegalStateException("Client has already been closed"); - } - closed = true; - - // Check if the request is registered - if so we can cancel the request - for (Map.Entry taskEntry : registeredTasks.entrySet()) { - terminateRequest(taskEntry.getValue()); + private void terminateRequest() { + if (closed || requestInfo == null) { + LOG.warn("No current request to terminate"); + return; } - registeredTasks.clear(); - scheduleClientForCleanup(this); - } - - private void terminateRequest(TaskHeartbeatInfo thi) { TerminateFragmentRequestProto.Builder builder = TerminateFragmentRequestProto.newBuilder(); - builder.setQueryIdentifier(thi.queryIdentifierProto); - builder.setFragmentIdentifierString(thi.taskAttemptId); + builder.setQueryIdentifier(requestInfo.queryIdentifierProto); + builder.setFragmentIdentifierString(requestInfo.taskAttemptId); - final String taskAttemptId = thi.taskAttemptId; - communicator.sendTerminateFragment(builder.build(), thi.hostname, thi.port, + final String taskAttemptId = requestInfo.taskAttemptId; + communicator.sendTerminateFragment(builder.build(), requestInfo.hostname, requestInfo.port, new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override @@ -181,16 +181,8 @@ public void indicateError(Throwable t) { }); } - private void doShutdown() throws IOException { - llapTaskUmbilicalServer.shutdownServer(); - timer.shutdown(); - if (this.communicator != null) { - this.communicator.stop(); - } - } - public InetSocketAddress getAddress() { - return llapTaskUmbilicalServer.getAddress(); + return SharedUmbilicalServer.getInstance(conf).llapTaskUmbilicalServer.getAddress(); } @@ -213,151 +205,139 @@ public void submitWork(SubmitWorkRequestProto request, String llapHost, int llap vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()); final String fragmentId = attemptId.toString(); - final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(queryIdentifierProto, fragmentId, llapHost, llapPort); - pendingEvents.putIfAbsent( - fragmentId, new PendingEventData(thi, Lists.newArrayList())); + this.requestInfo = new RequestInfo(request, queryIdentifierProto, fragmentId, llapHost, llapPort); - // Setup timer task to check for hearbeat timeouts - timer.scheduleAtFixedRate(new HeartbeatCheckTask(), - connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); + this.tezEvents = Lists.newArrayList(); + SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf); + umbilicalServer.umbilicalProtocol.pendingClients.putIfAbsent(fragmentId, this); + umbilicalServer.llapTaskUmbilicalServer.addTokenForJob(tokenIdentifier, sessionToken); // Send out the actual SubmitWorkRequest - communicator.sendSubmitWork(request, llapHost, llapPort, - new LlapProtocolClientProxy.ExecuteRequestCallback() { - - @Override - public void setResponse(SubmitWorkResponseProto response) { - if (response.hasSubmissionState()) { - if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) { - String msg = "Fragment: " + fragmentId + " rejected. Server Busy."; - LOG.info(msg); - if (responder != null) { - Throwable err = new RuntimeException(msg); - responder.submissionFailed(fragmentId, err); - } - return; - } - } - if (response.hasUniqueNodeId()) { - thi.uniqueNodeId = response.getUniqueNodeId(); - } - } - - @Override - public void indicateError(Throwable t) { - String msg = "Failed to submit: " + fragmentId; - LOG.error(msg, t); - Throwable err = new RuntimeException(msg, t); - responder.submissionFailed(fragmentId, err); - } - }); + final LlapTaskUmbilicalExternalClient client = this; + communicator.start(); + submitWork(); } - private void updateHeartbeatInfo(String taskAttemptId) { - int updateCount = 0; - - PendingEventData pendingEventData = pendingEvents.get(taskAttemptId); - if (pendingEventData != null) { - pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId); - if (heartbeatInfo != null) { - heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - if (updateCount == 0) { - LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + private void submitWork() { + if (!closed) { + communicator.sendSubmitWork(requestInfo.request, + requestInfo.hostname, requestInfo.port, new SubmitWorkCallback(this)); } } - private void updateHeartbeatInfo( - String hostname, String uniqueId, int port, TezAttemptArray tasks) { - int updateCount = 0; - HashSet attempts = new HashSet<>(); - for (Writable w : tasks.get()) { - attempts.add((TezTaskAttemptID)w); + // Helper class to submit fragments to LLAP and retry rejected submissions. + static class SubmitWorkCallback implements LlapProtocolClientProxy.ExecuteRequestCallback { + private LlapTaskUmbilicalExternalClient client; + + public SubmitWorkCallback(LlapTaskUmbilicalExternalClient client) { + this.client = client; } - String error = ""; - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo; - String thiUniqueId = thi.uniqueNodeId; - if (thi.hostname.equals(hostname) && thi.port == port - && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) { - TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId); - if (attempts.contains(ta)) { - thi.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } else { - error += (thi.taskAttemptId + ", "); + @Override + public void setResponse(SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + if (response.getSubmissionState().equals(SubmissionStateProto.REJECTED)) { + String fragmentId = this.client.requestInfo.taskAttemptId; + String msg = "Fragment: " + fragmentId + " rejected. Server Busy."; + LOG.info(msg); + + // Retry rejected requests + if (!client.closed) { + // Update lastHeartbeat so we don't timeout during the retry + client.setLastHeartbeat(System.currentTimeMillis()); + long retryDelay = HiveConf.getTimeVar(client.conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS); + LOG.info("Queueing fragment for resubmission: " + fragmentId); + final SubmitWorkCallback submitter = this; + retryExecutor.schedule( + new Runnable() { + @Override + public void run() { + client.submitWork(); + } + }, + retryDelay, TimeUnit.MILLISECONDS); } + return; } } + if (response.hasUniqueNodeId()) { + client.requestInfo.uniqueNodeId = response.getUniqueNodeId(); + } } - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo thi = registeredTasks.get(key); - if (thi != null) { - String thiUniqueId = thi.uniqueNodeId; - if (thi.hostname.equals(hostname) && thi.port == port - && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) { - TezTaskAttemptID ta = TezTaskAttemptID.fromString(thi.taskAttemptId); - if (attempts.contains(ta)) { - thi.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } else { - error += (thi.taskAttemptId + ", "); - } - } - } + @Override + public void indicateError(Throwable t) { + String fragmentId = this.client.requestInfo.taskAttemptId; + String msg = "Failed to submit: " + fragmentId; + LOG.error(msg, t); + Throwable err = new RuntimeException(msg, t); + client.unregisterClient(); + client.responder.submissionFailed(fragmentId, err); } - if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); + } + + @Override + public void close() { + if (!closed) { + terminateRequest(); + unregisterClient(); } + } - if (updateCount == 0) { - LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + private void unregisterClient() { + if (!closed && requestInfo != null) { + communicator.stop(); + SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(conf); + umbilicalServer.umbilicalProtocol.unregisterClient(requestInfo.taskAttemptId); + umbilicalServer.llapTaskUmbilicalServer.removeTokenForJob(tokenIdentifier); + closed = true; } } - private class HeartbeatCheckTask implements Runnable { + long getLastHeartbeat() { + return this.requestInfo.lastHeartbeat.get(); + } + + void setLastHeartbeat(long lastHeartbeat) { + this.requestInfo.lastHeartbeat.set(lastHeartbeat); + } + + // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat. + private static class HeartbeatCheckTask implements Runnable { + LlapTaskUmbilicalExternalImpl umbilicalImpl; + + public HeartbeatCheckTask(LlapTaskUmbilicalExternalImpl umbilicalImpl) { + this.umbilicalImpl = umbilicalImpl; + } + public void run() { long currentTime = System.currentTimeMillis(); - List timedOutTasks = new ArrayList(); + List timedOutTasks = new ArrayList(); // Check both pending and registered tasks for timeouts - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); + for (String key : umbilicalImpl.pendingClients.keySet()) { + LlapTaskUmbilicalExternalClient client = umbilicalImpl.pendingClients.get(key); + if (client != null) { + if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) { + timedOutTasks.add(client); } } } - for (String timedOutTask : timedOutTasks) { - LOG.info("Pending taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - pendingEvents.remove(timedOutTask); - } - - timedOutTasks.clear(); - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); - if (heartbeatInfo != null) { - if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); + for (String key : umbilicalImpl.registeredClients.keySet()) { + LlapTaskUmbilicalExternalClient client = umbilicalImpl.registeredClients.get(key); + if (client != null) { + if (currentTime - client.getLastHeartbeat() >= client.connectionTimeout) { + timedOutTasks.add(client); } } } - for (String timedOutTask : timedOutTasks) { - LOG.info("Running taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - registeredTasks.remove(timedOutTask); + for (LlapTaskUmbilicalExternalClient timedOutTask : timedOutTasks) { + String taskAttemptId = timedOutTask.requestInfo.taskAttemptId; + LOG.info("Running taskAttemptId " + taskAttemptId + " timed out"); + timedOutTask.unregisterClient(); + timedOutTask.responder.heartbeatTimeout(taskAttemptId); } } } @@ -369,10 +349,20 @@ public void run() { void heartbeatTimeout(String fragmentId); } + private static class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + final ConcurrentMap pendingClients = new ConcurrentHashMap<>(); + final ConcurrentMap registeredClients = new ConcurrentHashMap<>(); + private final ScheduledThreadPoolExecutor timer; - // Ideally, the server should be shared across all client sessions running on the same node. - private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + public LlapTaskUmbilicalExternalImpl(Configuration conf) { + long taskInterval = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + // Setup timer task to check for hearbeat timeouts + this.timer = new ScheduledThreadPoolExecutor(1); + timer.scheduleAtFixedRate(new HeartbeatCheckTask(this), + taskInterval, taskInterval, TimeUnit.MILLISECONDS); + } @Override public boolean canCommit(TezTaskAttemptID taskid) throws IOException { @@ -399,33 +389,28 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); String taskAttemptIdString = taskAttemptId.toString(); - - if (closed) { - LOG.info("Client has already been closed, but received heartbeat from " + taskAttemptIdString); - // Set shouldDie response so the LLAP daemon closes this umbilical connection. - response.setShouldDie(); - return response; - } - updateHeartbeatInfo(taskAttemptIdString); List tezEvents = null; - PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString); - if (pendingEventData == null) { + LlapTaskUmbilicalExternalClient client = pendingClients.remove(taskAttemptIdString); + if (client == null) { tezEvents = Collections.emptyList(); - // If this heartbeat was not from a pending event and it's not in our list of registered tasks, - if (!registeredTasks.containsKey(taskAttemptIdString)) { + client = registeredClients.get(taskAttemptIdString); + if (client == null) { + // Heartbeat is from a task that we are not currently tracking. LOG.info("Unexpected heartbeat from " + taskAttemptIdString); response.setShouldDie(); // Do any of the other fields need to be set? return response; } } else { - tezEvents = pendingEventData.tezEvents; + tezEvents = client.tezEvents; // Tasks removed from the pending list should then be added to the registered list. - registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo); + registeredClients.put(taskAttemptIdString, client); } + boolean shouldUnregisterClient = false; + response.setLastRequestId(request.getRequestId()); // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. // Also since we have all the MRInput events here - they'll all be sent in together. @@ -443,11 +428,11 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce switch (eventType) { case TASK_ATTEMPT_COMPLETED_EVENT: LOG.debug("Task completed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); + shouldUnregisterClient = true; break; case TASK_ATTEMPT_FAILED_EVENT: LOG.debug("Task failed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); + shouldUnregisterClient = true; break; case TASK_STATUS_UPDATE_EVENT: // If we want to handle counters @@ -459,10 +444,14 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce } } + if (shouldUnregisterClient) { + client.unregisterClient(); + } + // Pass the request on to the responder try { - if (responder != null) { - responder.heartbeat(request); + if (client.responder != null) { + client.responder.heartbeat(request); } } catch (Exception err) { LOG.error("Error during responder execution", err); @@ -474,6 +463,9 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce @Override public void nodeHeartbeat( Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId); + } updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw); // No need to propagate to this to the responder } @@ -482,14 +474,18 @@ public void nodeHeartbeat( public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { String taskAttemptIdString = taskAttemptId.toString(); LOG.error("Task killed - " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); - - try { - if (responder != null) { - responder.taskKilled(taskAttemptId); + LlapTaskUmbilicalExternalClient client = registeredClients.get(taskAttemptIdString); + if (client != null) { + try { + client.unregisterClient(); + if (client.responder != null) { + client.responder.taskKilled(taskAttemptId); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); } - } catch (Exception err) { - LOG.error("Error during responder execution", err); + } else { + LOG.info("Received task killed notification for task which is not currently being tracked: " + taskAttemptId); } } @@ -504,38 +500,84 @@ public ProtocolSignature getProtocolSignature(String protocol, long clientVersio return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } - } - private static void scheduleClientForCleanup(LlapTaskUmbilicalExternalClient client) { - // Add a bit of delay in case the daemon has not closed the umbilical connection yet. - clientCleanupExecuter.schedule(new ClientCleanupTask(client), cleanupDelay, TimeUnit.MILLISECONDS); - } + private void unregisterClient(String taskAttemptId) { + pendingClients.remove(taskAttemptId); + registeredClients.remove(taskAttemptId); + } - static final ScheduledThreadPoolExecutor clientCleanupExecuter = new ScheduledThreadPoolExecutor(1); - static final int cleanupDelay = 2000; + private void updateHeartbeatInfo(String taskAttemptId) { + int updateCount = 0; - static class ClientCleanupTask implements Runnable { - final LlapTaskUmbilicalExternalClient client; + LlapTaskUmbilicalExternalClient pendingClient = pendingClients.get(taskAttemptId); + if (pendingClient != null) { + pendingClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } - public ClientCleanupTask(LlapTaskUmbilicalExternalClient client) { - this.client = client; + LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(taskAttemptId); + if (registeredClient != null) { + registeredClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } + + if (updateCount == 0) { + LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + } } - @Override - public void run() { - if (client.llapTaskUmbilicalServer.getNumOpenConnections() == 0) { - // No more outstanding connections, ok to close. - try { - LOG.debug("Closing client"); - client.doShutdown(); - } catch (Exception err) { - LOG.error("Error cleaning up client", err); + private void updateHeartbeatInfo( + String hostname, String uniqueId, int port, TezAttemptArray tasks) { + int updateCount = 0; + HashSet attempts = new HashSet<>(); + for (Writable w : tasks.get()) { + attempts.add((TezTaskAttemptID)w); + } + + String error = ""; + for (String key : pendingClients.keySet()) { + LlapTaskUmbilicalExternalClient pendingClient = pendingClients.get(key); + if (pendingClient != null) { + if (doesClientMatchHeartbeat(pendingClient, hostname, uniqueId, port)) { + TezTaskAttemptID ta = TezTaskAttemptID.fromString(pendingClient.requestInfo.taskAttemptId); + if (attempts.contains(ta)) { + pendingClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } else { + error += (pendingClient.requestInfo.taskAttemptId + ", "); + } + } } - } else { - // Reschedule this task for later. - LOG.debug("Client still has umbilical connection - rescheduling cleanup."); - scheduleClientForCleanup(client); } + + for (String key : registeredClients.keySet()) { + LlapTaskUmbilicalExternalClient registeredClient = registeredClients.get(key); + if (registeredClient != null) { + if (doesClientMatchHeartbeat(registeredClient, hostname, uniqueId, port)) { + TezTaskAttemptID ta = TezTaskAttemptID.fromString(registeredClient.requestInfo.taskAttemptId); + if (attempts.contains(ta)) { + registeredClient.setLastHeartbeat(System.currentTimeMillis()); + updateCount++; + } else { + error += (registeredClient.requestInfo.taskAttemptId + ", "); + } + } + } + } + if (!error.isEmpty()) { + LOG.info("The tasks we expected to be on the node are not there: " + error); + } + + if (updateCount == 0) { + LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + } + } + + private static boolean doesClientMatchHeartbeat(LlapTaskUmbilicalExternalClient client, + String hostname, String uniqueId, int port) { + return (hostname.equals(client.requestInfo.hostname) + && port == client.requestInfo.port + && uniqueId.equals(client.requestInfo.uniqueNodeId)); } } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java index 403381d..89cb6fb 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -18,10 +18,15 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -32,6 +37,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.api.impl.TezEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +48,11 @@ protected volatile Server server; private final InetSocketAddress address; private final AtomicBoolean started = new AtomicBoolean(true); + private JobTokenSecretManager jobTokenSecretManager; + private Map tokenRefMap = new HashMap(); - public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token token) throws - IOException { - JobTokenSecretManager jobTokenSecretManager = - new JobTokenSecretManager(); - jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException { + jobTokenSecretManager = new JobTokenSecretManager(); server = new RPC.Builder(conf) .setProtocol(LlapTaskUmbilicalProtocol.class) @@ -65,7 +70,7 @@ public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umb this.address = NetUtils.getConnectAddress(server); LOG.info( "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address + - " with numHandlers=" + numHandlers); + " with numHandlers=" + numHandlers); } public InetSocketAddress getAddress() { @@ -76,6 +81,33 @@ public int getNumOpenConnections() { return server.getNumOpenConnections(); } + public synchronized void addTokenForJob(String tokenIdentifier, Token token) { + // Maintain count of outstanding requests for tokenIdentifier. + int[] refCount = tokenRefMap.get(tokenIdentifier); + if (refCount == null) { + refCount = new int[] { 0 }; + tokenRefMap.put(tokenIdentifier, refCount); + // Should only need to insert the token the first time. + jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + } + refCount[0]++; + } + + public synchronized void removeTokenForJob(String tokenIdentifier) { + // Maintain count of outstanding requests for tokenIdentifier. + // If count goes to 0, it is safe to remove the token. + int[] refCount = tokenRefMap.get(tokenIdentifier); + if (refCount == null) { + LOG.warn("No refCount found for tokenIdentifier " + tokenIdentifier); + } else { + refCount[0]--; + if (refCount[0] <= 0) { + tokenRefMap.remove(tokenIdentifier); + jobTokenSecretManager.removeTokenForJob(tokenIdentifier); + } + } + } + public void shutdownServer() { if (started.get()) { // Primarily to avoid multiple shutdowns. started.set(false); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index eb93241..201f5fa 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -147,8 +147,6 @@ public LlapBaseInputFormat() {} LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), umbilicalResponder, llapToken); - llapClient.init(job); - llapClient.start(); int attemptNum = 0; // Use task attempt number from conf if provided diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index d4ec44e..51e5d3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; @@ -90,6 +91,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; @@ -368,6 +371,9 @@ public PlanFragment createPlanFragment(String query, int num) queryUser = UserGroupInformation.getCurrentUser().getUserName(); } + // Generate umbilical token (applies to all splits) + Token umbilicalToken = createJobToken(applicationId); + LOG.info("Number of splits: " + (eventList.size() - 1)); SignedMessage signedSvs = null; for (int i = 0; i < eventList.size() - 1; i++) { @@ -388,7 +394,7 @@ public PlanFragment createPlanFragment(String query, int num) SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message, - signedSvs.signature); + signedSvs.signature, umbilicalToken); byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); // 3. Generate input event. @@ -406,6 +412,16 @@ public PlanFragment createPlanFragment(String query, int num) } } + private Token createJobToken(ApplicationId applicationId) { + String tokenIdentifier = applicationId.toString(); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + tokenIdentifier)); + Token sessionToken = new Token(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } + private SplitLocationInfo[] makeLocationHints(TaskLocationHint hint) { Set hosts = hint.getHosts(); if (hosts.size() != 1) {