diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 65f7232..a0fbcbb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -111,7 +111,7 @@ private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Tracks appMasters to which heartbeats are being sent. This should not be used for any other // messages like taskKilled, etc. - private final Map knownAppMasters = new HashMap<>(); + private final Map knownAppMasters = new HashMap<>(); volatile ListenableFuture queueLookupFuture; private final DaemonId daemonId; @@ -198,34 +198,43 @@ public void registerTask(String amLocation, int port, String user, Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID attemptId) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier); + LOG.trace( + "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}", + (amLocation + ":" + port), queryIdentifier, attemptId); } AMNodeInfo amNodeInfo; + + // Since we don't have an explicit AM end signal yet - we're going to create + // and discard AMNodeInfo instances per query. synchronized (knownAppMasters) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); - knownAppMasters.put(amNodeId, amNodeInfo); + knownAppMasters.put(queryIdentifier, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); + // AMNodeInfo will only be cleared when a queryComplete is received for this query, or + // when we detect a failure on the AM side (failure to heartbeat). + // A single queueLookupCallable is added here. We have to make sure one instance stays + // in the queue till the query completes. + LOG.info("Pending heartbeat queue size={}", pendingHeartbeatQueeu.size()); } - amNodeInfo.setCurrentQueryIdentifier(queryIdentifier); amNodeInfo.addTaskAttempt(attemptId); } } - public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) { + public void unregisterTask(String amLocation, int port, QueryIdentifier queryIdentifier, TezTaskAttemptID ta) { + if (LOG.isTraceEnabled()) { - LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port); + LOG.trace("Un-registering for heartbeat: {}, attempt={}", (amLocation + ":" + port), ta); } AMNodeInfo amNodeInfo; - LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); synchronized (knownAppMasters) { - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port)); } else { @@ -241,7 +250,7 @@ public void taskKilled(String amLocation, int port, String user, Token 0 if (amNodeInfo.getTaskCount() > 0) { // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); ListenableFuture future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); Futures.addCallback(future, new FutureCallback() { @Override @@ -312,9 +332,9 @@ public void onSuccess(Void result) { @Override public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + LOG.warn("Heartbeat failed to AM {}. Marking query as failed. query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); queryFailedHandler.queryFailed(currentQueryIdentifier); } @@ -372,11 +392,10 @@ protected Void callInternal() { if (LOG.isTraceEnabled()) { LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo); } + LOG.info("Attempting to heartbeat to AM: " + amNodeInfo); List tasks = amNodeInfo.getTasksSnapshot(); if (tasks.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0"); - } + LOG.info("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0"); return null; } try { @@ -388,7 +407,7 @@ protected Void callInternal() { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw); } catch (IOException e) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", amNodeInfo.amNodeId, currentQueryIdentifier, e); @@ -416,9 +435,10 @@ protected Void callInternal() { private final long timeout; private final SocketFactory socketFactory; private final AtomicBoolean amFailed = new AtomicBoolean(false); - private QueryIdentifier currentQueryIdentifier; + private final QueryIdentifier queryIdentifier; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; + private final AtomicBoolean isDone = new AtomicBoolean(false); public AMNodeInfo(LlapNodeId amNodeId, String user, @@ -430,7 +450,7 @@ public AMNodeInfo(LlapNodeId amNodeId, String user, Configuration conf) { this.user = user; this.jobToken = jobToken; - this.currentQueryIdentifier = currentQueryIdentifier; + this.queryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; this.timeout = timeout; this.socketFactory = socketFactory; @@ -491,6 +511,14 @@ boolean hasAmFailed() { return amFailed.get(); } + void setIsDone(boolean val) { + isDone.set(val); + } + + boolean isDone() { + return isDone.get(); + } + List getTasksSnapshot() { List result = new ArrayList<>(); synchronized (tasks) { @@ -499,12 +527,8 @@ boolean hasAmFailed() { return result; } - public synchronized QueryIdentifier getCurrentQueryIdentifier() { - return currentQueryIdentifier; - } - - public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) { - this.currentQueryIdentifier = queryIdentifier; + public QueryIdentifier getQueryIdentifier() { + return queryIdentifier; } synchronized void setNextHeartbeatTime(long nextTime) { @@ -530,7 +554,7 @@ public int compareTo(Delayed o) { @Override public String toString() { - return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount(); + return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount() + ", queryIdentifier=" + queryIdentifier; } private int getTaskCount() { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index af8f5b0..ca476ec 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -401,8 +401,7 @@ public QueryCompleteResponseProto queryComplete( fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } - LlapNodeId amNodeId = queryInfo.getAmNodeId(); - amReporter.queryComplete(amNodeId); + amReporter.queryComplete(queryIdentifier); } return QueryCompleteResponseProto.getDefaultInstance(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 25dc569..f24a647 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -188,7 +188,8 @@ protected TaskRunner2Result callInternal() throws Exception { // Unregister from the AMReporter, since the task is now running. TezTaskAttemptID ta = taskSpec.getTaskAttemptID(); - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta); + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier(), ta); synchronized (this) { if (!shouldRunTask) { @@ -355,7 +356,9 @@ public void killTask() { // If the task hasn't started - inform about fragment completion immediately. It's possible for // the callable to never run. fragmentCompletionHanler.fragmentComplete(fragmentInfo); - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta); + this.amReporter + .unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier(), ta); } } } else { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index e593b33..2f0d426 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -761,10 +761,10 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce @Override public void nodeHeartbeat( Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException { - nodePinged(hostname.toString(), uniqueId.toString(), port, aw); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]"); } + nodePinged(hostname.toString(), uniqueId.toString(), port, aw); } @Override