diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 93237e6..cae2622 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -20,6 +20,7 @@ import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.HashSet; @@ -126,9 +127,9 @@ public AMReporter(int numExecutors, int maxThreads, AtomicReference jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { - // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. - // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, - conf); + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + conf); + } + } // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -265,6 +269,20 @@ public void onFailure(Throwable t) { }); } + public void queryComplete(LlapNodeId llapNodeId) { + if (llapNodeId != null) { + synchronized (knownAppMasters) { + AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete + // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for + // it to be closed after max idle timeout (10s default) + if (LOG.isDebugEnabled()) { + LOG.debug("Query complete received. Removed {}.", amNodeInfo); + } + } + } + } + private class QueueLookupCallable extends CallableWithNdc { @Override @@ -272,7 +290,7 @@ protected Void callInternal() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -281,8 +299,8 @@ protected Void callInternal() { amNodeInfo.hasAmFailed(), amNodeInfo); } knownAppMasters.remove(amNodeInfo.amNodeId); + amNodeInfo.stopUmbilical(); } - amNodeInfo.stopUmbilical(); } else { // Add back to the queue for the next heartbeat, and schedule the actual heartbeat long next = System.currentTimeMillis() + heartbeatInterval; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6908138..d2ebc6e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -235,11 +236,12 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Token jobToken = TokenCache.getSessionToken(credentials); + LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort()); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -382,6 +384,8 @@ public QueryCompleteResponseProto queryComplete( new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), request.getQueryIdentifier().getDagIndex()); LOG.info("Processing queryComplete notification for {}", queryIdentifier); + LlapNodeId amNodeId = queryTracker.getAMNodeIdForQuery(queryIdentifier); + amReporter.queryComplete(amNodeId); List knownFragments = queryTracker.queryComplete( queryIdentifier, request.getDeleteDelay(), false); LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 9eaddd2..f8faada 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; import org.apache.hadoop.security.UserGroupInformation; @@ -68,7 +69,7 @@ private final ScheduledExecutorService executorService; private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); - + private final ConcurrentHashMap queryAMNodeId = new ConcurrentHashMap<>(); private final String[] localDirsBase; @@ -137,9 +138,10 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId * Register a new fragment for a specific query */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user, SignableVertexSpec vertex, Token appToken, - String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { + String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, + int attemptNumber, + String user, SignableVertexSpec vertex, Token appToken, + String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -160,6 +162,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); } boolean isExistingQueryInfo = true; + queryAMNodeId.putIfAbsent(queryIdentifier, amNodeId); QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo == null) { if (UserGroupInformation.isSecurityEnabled()) { @@ -290,6 +293,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { if (savedQueryId != null) { ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } + queryAMNodeId.remove(queryIdentifier); return queryInfo.getRegisteredFragments(); } finally { dagLock.writeLock().unlock(); @@ -414,4 +418,8 @@ private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IO public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException { return checkPermissionsAndGetQuery(queryId) != null; } + + public LlapNodeId getAMNodeIdForQuery(QueryIdentifier queryIdentifier) { + return queryAMNodeId.get(queryIdentifier); + } }