diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index ad39963614..25e10f4e6c 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -15,6 +15,8 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.security.PrivilegedAction; import java.util.HashSet; import java.util.Iterator; @@ -430,6 +432,9 @@ public void onRemoval(RemovalNotification arg) { protected final ProtocolType getProxy( final LlapNodeId nodeId, final Token nodeToken) { String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort()); + if (LOG.isDebugEnabled()) { + LOG.debug("Getting host proxies for {}", hostId); + } try { return hostProxies.get(hostId, new Callable() { @Override @@ -481,7 +486,16 @@ public ProtocolType run() { } private String getHostIdentifier(String hostname, int port) { - return hostname + ":" + port; + StringBuilder sb = new StringBuilder(); + try { + InetAddress inetAddress = InetAddress.getByName(hostname); + sb.append(inetAddress.getHostAddress()).append(":"); + } catch (UnknownHostException e) { + // ignore + LOG.warn("Unable to determine IP address for host: {}.. Ignoring..", hostname, e); + } + sb.append(hostname).append(":").append(port); + return sb.toString(); } protected abstract ProtocolType createProtocolImpl(Configuration config, String hostname, int port, diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index dc10f22bf9..e5dc378f62 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -724,13 +724,13 @@ public void registerKnownNode(LlapNodeId nodeId) { } } - public void registerPingingNode(LlapNodeId nodeId) { + public void registerPingingNode(LlapNodeId nodeId, String uniqueId) { long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); PingingNodeInfo ni = new PingingNodeInfo(currentTs); PingingNodeInfo old = pingedNodeMap.put(nodeId, ni); if (old == null) { if (LOG.isInfoEnabled()) { - LOG.info("Added new pinging node: [{}]", nodeId); + LOG.info("Added new pinging node: [{}] with uniqueId: {}", nodeId, uniqueId); } } else { old.pingCount.incrementAndGet(); @@ -758,7 +758,7 @@ void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray tasks, BooleanArray guaranteed) { // TODO: do we ever need the port? we could just do away with nodeId altogether. LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); - registerPingingNode(nodeId); + registerPingingNode(nodeId, uniqueId); BiMap biMap = entityTracker.getContainerAttemptMapForNode(nodeId); if (biMap != null) { @@ -793,9 +793,16 @@ void nodePinged(String hostname, String uniqueId, int port, getContext().containerAlive(entry.getKey()); } } - } - if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); + + if (!error.isEmpty()) { + LOG.info("The tasks we expected to be on the node are not there: " + error); + for (Map.Entry entry : biMap.entrySet()) { + LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " + + "registered with different unique ID", entry.getValue()); + getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED, + "Node with same host and port but with new unique ID pinged"); + } + } } } else { long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);