diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index a30f8b9..65f7232 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -16,7 +16,6 @@ import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; -import org.apache.hadoop.io.ArrayWritable; import java.util.ArrayList; @@ -26,7 +25,6 @@ import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; @@ -44,7 +42,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.FutureCallback; @@ -127,9 +124,9 @@ public AMReporter(int numExecutors, int maxThreads, AtomicReference jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { - // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. - // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, - conf); + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + conf); + } + } // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -266,6 +266,20 @@ public void onFailure(Throwable t) { }); } + public void queryComplete(LlapNodeId llapNodeId) { + if (llapNodeId != null) { + synchronized (knownAppMasters) { + AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete + // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for + // it to be closed after max idle timeout (10s default) + if (LOG.isDebugEnabled()) { + LOG.debug("Query complete received. Removed {}.", amNodeInfo); + } + } + } + } + private class QueueLookupCallable extends CallableWithNdc { @Override @@ -273,7 +287,7 @@ protected Void callInternal() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -283,28 +297,29 @@ protected Void callInternal() { } knownAppMasters.remove(amNodeInfo.amNodeId); } - amNodeInfo.stopUmbilical(); } else { - // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); - ListenableFuture future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Void result) { - // Nothing to do. - } - - @Override - public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); - amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + if (amNodeInfo.getTaskCount() > 0) { + // Add back to the queue for the next heartbeat, and schedule the actual heartbeat + long next = System.currentTimeMillis() + heartbeatInterval; + amNodeInfo.setNextHeartbeatTime(next); + pendingHeartbeatQueeu.add(amNodeInfo); + ListenableFuture future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Void result) { + // Nothing to do. + } + + @Override + public void onFailure(Throwable t) { + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + amNodeInfo.setAmFailed(true); + LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); - queryFailedHandler.queryFailed(currentQueryIdentifier); - } - }); + queryFailedHandler.queryFailed(currentQueryIdentifier); + } + }); + } } } catch (InterruptedException e) { if (isShutdown.get()) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index cc4eff0..1176e5e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -240,12 +241,12 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Token jobToken = TokenCache.getSessionToken(credentials); + LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort()); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(), - request.getAmPort()); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -388,14 +389,18 @@ public QueryCompleteResponseProto queryComplete( new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), request.getQueryIdentifier().getDagIndex()); LOG.info("Processing queryComplete notification for {}", queryIdentifier); - List knownFragments = queryTracker.queryComplete( - queryIdentifier, request.getDeleteDelay(), false); - LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, + QueryInfo queryInfo = queryTracker.queryComplete(queryIdentifier, request.getDeleteDelay(), false); + if (queryInfo != null) { + List knownFragments = queryInfo.getRegisteredFragments(); + LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); - for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier, + for (QueryFragmentInfo fragmentInfo : knownFragments) { + LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); - executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + } + LlapNodeId amNodeId = queryInfo.getAmNodeId(); + amReporter.queryComplete(amNodeId); } return QueryCompleteResponseProto.getDefaultInstance(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index eaa3e7e..088f07c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -35,6 +35,7 @@ import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; @@ -55,6 +56,7 @@ private final String[] localDirsBase; private final FileSystem localFs; private String[] localDirs; + private final LlapNodeId amNodeId; // Map of states for different vertices. private final Set knownFragments = @@ -67,11 +69,11 @@ private final AtomicReference umbilicalUgi; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, - int dagIdentifier, String user, - ConcurrentMap sourceStateMap, - String[] localDirsBase, FileSystem localFs, String tokenUserName, - String tokenAppId) { + String dagName, String hiveQueryIdString, + int dagIdentifier, String user, + ConcurrentMap sourceStateMap, + String[] localDirsBase, FileSystem localFs, String tokenUserName, + String tokenAppId, final LlapNodeId amNodeId) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -85,6 +87,7 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag this.tokenUserName = tokenUserName; this.appId = tokenAppId; this.umbilicalUgi = new AtomicReference<>(); + this.amNodeId = amNodeId; } public QueryIdentifier getQueryIdentifier() { @@ -115,6 +118,10 @@ public String getUser() { return sourceStateMap; } + public LlapNodeId getAmNodeId() { + return amNodeId; + } + public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) { QueryFragmentInfo fragmentInfo = new QueryFragmentInfo( diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5cf3a38..7e646c5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; import org.apache.hadoop.security.UserGroupInformation; @@ -69,8 +70,6 @@ private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); - - private final String[] localDirsBase; private final FileSystem localFs; private final String clusterId; @@ -137,9 +136,10 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId * Register a new fragment for a specific query */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user, SignableVertexSpec vertex, Token appToken, - String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException { + String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, + int attemptNumber, + String user, SignableVertexSpec vertex, Token appToken, + String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -169,13 +169,13 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId); + tokenInfo.userName, tokenInfo.appId, amNodeId); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; } else { // Ensure the UGI is setup once. - queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort); + queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort()); isExistingQueryInfo = false; } } @@ -238,7 +238,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { * @param queryIdentifier * @param deleteDelay */ - List queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, + QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, boolean isInternal) throws IOException { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; @@ -255,7 +255,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { if (queryInfo == null) { // Should not happen. LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); - return Collections.emptyList(); + return null; } String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { @@ -292,7 +292,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { if (savedQueryId != null) { ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } - return queryInfo.getRegisteredFragments(); + return queryInfo; } finally { dagLock.writeLock().unlock(); } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index ae3328a..d1fce19 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -93,7 +93,7 @@ public static QueryInfo createQueryInfo() { new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), - new String[0], null, "fakeUser", null); + new String[0], null, "fakeUser", null, null); return queryInfo; }