diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 93237e6..adf6f5a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -20,6 +20,7 @@ import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.HashSet; @@ -126,9 +127,9 @@ public AMReporter(int numExecutors, int maxThreads, AtomicReference jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { - // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. - // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, - conf); + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + conf); + } + } // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -265,6 +269,28 @@ public void onFailure(Throwable t) { }); } + public void stopUmbilical(final QueryIdentifier queryIdentifier) { + synchronized (knownAppMasters) { + for (Iterator> entryIterator = knownAppMasters.entrySet().iterator(); + entryIterator.hasNext(); ) { + Map.Entry entry = entryIterator.next(); + AMNodeInfo amNodeInfo = entry.getValue(); + if (amNodeInfo != null) { + QueryIdentifier currentQueryIdentifier = entry.getValue().getCurrentQueryIdentifier(); + if (currentQueryIdentifier != null && currentQueryIdentifier.equals(queryIdentifier)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Query complete received for {}. Stopping umbilical and removing AMNodeInfo {}.", + queryIdentifier, amNodeInfo); + } + amNodeInfo.stopUmbilical(); + entryIterator.remove(); + break; + } + } + } + } + } + private class QueueLookupCallable extends CallableWithNdc { @Override @@ -272,7 +298,7 @@ protected Void callInternal() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -281,8 +307,8 @@ protected Void callInternal() { amNodeInfo.hasAmFailed(), amNodeInfo); } knownAppMasters.remove(amNodeInfo.amNodeId); + amNodeInfo.stopUmbilical(); } - amNodeInfo.stopUmbilical(); } else { // Add back to the queue for the next heartbeat, and schedule the actual heartbeat long next = System.currentTimeMillis() + heartbeatInterval; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6908138..6fbc001 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -391,6 +391,7 @@ public QueryCompleteResponseProto queryComplete( fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } + amReporter.stopUmbilical(queryIdentifier); return QueryCompleteResponseProto.getDefaultInstance(); }