diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index d327fc0..32bc59c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -310,7 +310,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, LOG.info( "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); - LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId); + LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId); // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself if (nodeId != null) { TerminateFragmentRequestProto request = @@ -515,12 +515,13 @@ void unregisterTaskAttempt(TezTaskAttemptID attemptId) { synchronized(bMap) { matched = bMap.inverse().remove(attemptId); } - } - // Removing here. Registration into the map has to make sure to put - if (bMap.isEmpty()) { - nodeMap.remove(llapNodeId); + // Removing here. Registration into the map has to make sure to put + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } } + // Remove the container mapping if (matched != null) { containerToNodeMap.remove(matched); @@ -530,18 +531,24 @@ void unregisterTaskAttempt(TezTaskAttemptID attemptId) { void registerContainer(ContainerId containerId, String hostname, int port) { containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); + // TODO: Register with the node as well ? Similar to registering a task attempt. + // TODO: May need to consider additional synchronization + // TODO: Add tests for various scenarios. i.e. the various threading scenarios that can be hit. + // ... container added, task added, container removed + // ... container added, task added, container removed, task removed + // ... etc } LlapNodeId getNodeIdForContainer(ContainerId containerId) { return containerToNodeMap.get(containerId); } - LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) { + LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) { return attemptToNodeMap.get(taskAttemptId); } ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) { - LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId); + LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId); if (llapNodeId != null) { BiMap bMap = nodeMap.get(llapNodeId).inverse(); if (bMap != null) { @@ -581,11 +588,13 @@ void unregisterContainer(ContainerId containerId) { synchronized(bMap) { matched = bMap.remove(containerId); } + + // Removing here. Registration into the map has to make sure to put + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } } - // Removing here. Registration into the map has to make sure to put - if (bMap.isEmpty()) { - nodeMap.remove(llapNodeId); - } + // Remove the container mapping if (matched != null) {