diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index dc10f22bf9..e5dc378f62 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -724,13 +724,13 @@ public void registerKnownNode(LlapNodeId nodeId) { } } - public void registerPingingNode(LlapNodeId nodeId) { + public void registerPingingNode(LlapNodeId nodeId, String uniqueId) { long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); PingingNodeInfo ni = new PingingNodeInfo(currentTs); PingingNodeInfo old = pingedNodeMap.put(nodeId, ni); if (old == null) { if (LOG.isInfoEnabled()) { - LOG.info("Added new pinging node: [{}]", nodeId); + LOG.info("Added new pinging node: [{}] with uniqueId: {}", nodeId, uniqueId); } } else { old.pingCount.incrementAndGet(); @@ -758,7 +758,7 @@ void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks, BooleanArray guaranteed) { // TODO: do we ever need the port? we could just do away with nodeId altogether. LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); - registerPingingNode(nodeId); + registerPingingNode(nodeId, uniqueId); BiMap biMap = entityTracker.getContainerAttemptMapForNode(nodeId); if (biMap != null) { @@ -793,9 +793,16 @@ void nodePinged(String hostname, String uniqueId, int port, getContext().containerAlive(entry.getKey()); } } - } - if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); + + if (!error.isEmpty()) { + LOG.info("The tasks we expected to be on the node are not there: " + error); + for (Map.Entry entry : biMap.entrySet()) { + LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " + + "registered with different unique ID", entry.getValue()); + getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED, + "Node with same host and port but with new unique ID pinged"); + } + } } } else { long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);