diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index a30f8b9..d967753 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -20,6 +20,7 @@ import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.HashSet; @@ -127,9 +128,9 @@ public AMReporter(int numExecutors, int maxThreads, AtomicReference jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { - // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. - // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, - conf); + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + conf); + } + } // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -266,6 +270,20 @@ public void onFailure(Throwable t) { }); } + public void queryComplete(LlapNodeId llapNodeId) { + if (llapNodeId != null) { + synchronized (knownAppMasters) { + AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete + // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for + // it to be closed after max idle timeout (10s default) + if (LOG.isDebugEnabled()) { + LOG.debug("Query complete received. Removed {}.", amNodeInfo); + } + } + } + } + private class QueueLookupCallable extends CallableWithNdc { @Override @@ -273,7 +291,7 @@ protected Void callInternal() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -282,29 +300,31 @@ protected Void callInternal() { amNodeInfo.hasAmFailed(), amNodeInfo); } knownAppMasters.remove(amNodeInfo.amNodeId); + amNodeInfo.stopUmbilical(); } - amNodeInfo.stopUmbilical(); } else { - // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); - ListenableFuture future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Void result) { - // Nothing to do. - } - - @Override - public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); - amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + if (amNodeInfo.getTaskCount() > 0) { + // Add back to the queue for the next heartbeat, and schedule the actual heartbeat + long next = System.currentTimeMillis() + heartbeatInterval; + amNodeInfo.setNextHeartbeatTime(next); + pendingHeartbeatQueeu.add(amNodeInfo); + ListenableFuture future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Void result) { + // Nothing to do. + } + + @Override + public void onFailure(Throwable t) { + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + amNodeInfo.setAmFailed(true); + LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); - queryFailedHandler.queryFailed(currentQueryIdentifier); - } - }); + queryFailedHandler.queryFailed(currentQueryIdentifier); + } + }); + } } } catch (InterruptedException e) { if (isShutdown.get()) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index cc4eff0..51e777c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -240,12 +241,12 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Token jobToken = TokenCache.getSessionToken(credentials); + LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort()); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(), - request.getAmPort()); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -397,6 +398,8 @@ public QueryCompleteResponseProto queryComplete( fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } + LlapNodeId amNodeId = queryTracker.removeAMNodeIdForQuery(queryIdentifier); + amReporter.queryComplete(amNodeId); return QueryCompleteResponseProto.getDefaultInstance(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5cf3a38..ec176d2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; import org.apache.hadoop.security.UserGroupInformation; @@ -68,7 +69,7 @@ private final ScheduledExecutorService executorService; private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); - + private final ConcurrentHashMap queryAMNodeId = new ConcurrentHashMap<>(); private final String[] localDirsBase; @@ -137,9 +138,10 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId * Register a new fragment for a specific query */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user, SignableVertexSpec vertex, Token appToken, - String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException { + String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, + int attemptNumber, + String user, SignableVertexSpec vertex, Token appToken, + String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -160,6 +162,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); } boolean isExistingQueryInfo = true; + queryAMNodeId.putIfAbsent(queryIdentifier, amNodeId); QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo == null) { if (UserGroupInformation.isSecurityEnabled()) { @@ -175,7 +178,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId queryInfo = old; } else { // Ensure the UGI is setup once. - queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort); + queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort()); isExistingQueryInfo = false; } } @@ -416,4 +419,8 @@ private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IO public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException { return checkPermissionsAndGetQuery(queryId) != null; } + + public LlapNodeId removeAMNodeIdForQuery(QueryIdentifier queryIdentifier) { + return queryAMNodeId.remove(queryIdentifier); + } }