diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index e5dc378f62..b168f7648e 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -762,46 +762,37 @@ void nodePinged(String hostname, String uniqueId, int port, BiMap biMap = entityTracker.getContainerAttemptMapForNode(nodeId); if (biMap != null) { - HashMap attempts = new HashMap<>(); - for (int i = 0; i < tasks.get().length; ++i) { - boolean isGuaranteed = false; - if (guaranteed != null) { - isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get(); - } - attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed); - } - String error = ""; + Set error = new HashSet<>(); synchronized (biMap) { - for (Map.Entry entry : biMap.entrySet()) { - // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID, - // or at least (in this case) track the latest unique ID for LlapNode and retry all - // older-node tasks proactively. For now let the heartbeats fail them. - TezTaskAttemptID attemptId = entry.getValue(); - String taskNodeId = entityTracker.getUniqueNodeId(attemptId); - // Unique ID is registered based on Submit response. Theoretically, we could get a ping - // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null. - // However, the next heartbeat(s) should get the value eventually and mark task as alive. - // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET. - if (taskNodeId != null && taskNodeId.equals(uniqueId)) { - Boolean isGuaranteed = attempts.get(attemptId); - if (isGuaranteed != null) { - getContext().taskAlive(attemptId); - scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue()); + for (int i = 0; i < tasks.get().length; ++i) { + boolean isGuaranteed = false; + if (guaranteed != null) { + isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get(); + } + TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i]; + + // Check if the taskAttempt is present in AM view + if (biMap.containsValue(attemptID)) { + String taskNodeId = entityTracker.getUniqueNodeId(attemptID); + if (taskNodeId != null && taskNodeId.equals(uniqueId)) { + getContext().taskAlive(attemptID); + scheduler.taskInfoUpdated(attemptID, isGuaranteed); + getContext().containerAlive(biMap.inverse().get(attemptID)); } else { - error += (attemptId + ", "); + error.add(attemptID); } - getContext().containerAlive(entry.getKey()); } } + } - if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); - for (Map.Entry entry : biMap.entrySet()) { - LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " + - "registered with different unique ID", entry.getValue()); - getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED, + if (!error.isEmpty()) { + LOG.info("The tasks we expected to be on the node are not there: " + error); + for (TezTaskAttemptID attempt: error) { + LOG.info("Sending a kill for attempt {}, due to a ping from " + + "node with same host and same port but " + + "registered with different unique ID", attempt); + getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED, "Node with same host and port but with new unique ID pinged"); - } } } } else {