diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 648f977f27..4fc479deb5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -448,42 +448,58 @@ private void setUpAcls(String path) throws Exception { private void addToCache(String path, String host, InstanceType instance) { instanceCacheLock.lock(); try { - putInInstanceCache(path, pathToInstanceCache, instance); - putInNodeCache(host, nodeToInstanceCache, instance); + pathToInstanceCache.put(path, instance); + addToNodeCache(host, instance); + // to avoid stale size() value, logging it under the lock + LOG.info("Added path={}, host={} instance={} to cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); } finally { instanceCacheLock.unlock(); } - LOG.debug("Added path={}, host={} instance={} to cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); } - private void removeFromCache(String path, String host) { + private void removeFromCache(String path, String host, final InstanceType instance) { instanceCacheLock.lock(); try { pathToInstanceCache.remove(path); - nodeToInstanceCache.remove(host); + removeFromNodeCache(host, instance); + // to avoid stale size() value, logging it under the lock + LOG.info("Removed path={}, host={} from cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); } finally { instanceCacheLock.unlock(); } - LOG.debug("Removed path={}, host={} from cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); - } - - private void putInInstanceCache(String key, Map cache, - InstanceType instance) { - cache.put(key, instance); } - private void putInNodeCache(String key, Map> cache, - InstanceType instance) { - Set instanceSet = cache.get(key); + private void addToNodeCache(String key, InstanceType instance) { + Set instanceSet = nodeToInstanceCache.get(key); if (instanceSet == null) { instanceSet = new HashSet<>(); - instanceSet.add(instance); } - cache.put(key, instanceSet); + // there can be multiple instances in same host (in case of kubernetes, same hostname pods are possible, one new + // pod and one old stale pod which will be removed after ZK session timeout) + instanceSet.add(instance); + nodeToInstanceCache.put(key, instanceSet); + } + + private void removeFromNodeCache(String key, InstanceType instance) { + // there could be multiple instances in the set matching the same hostname, we only remove the ones that matches + // the InstanceType object which contains the workerIdentity/UUID. + Set instanceSet = nodeToInstanceCache.get(key); + boolean removeKey = true; + if (instanceSet != null) { + if (instanceSet.remove(instance)) { + LOG.info("Removed instance: {} from cache for host: {}", instance, key); + } + if (!instanceSet.isEmpty()) { + removeKey = false; + } + } + if (removeKey) { + nodeToInstanceCache.remove(key); + } } protected final void populateCache(TreeCache instancesCache, boolean doInvokeListeners) { @@ -619,7 +635,7 @@ public void childEvent(final CuratorFramework client, } break; case NODE_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); + removeFromCache(childData.getPath(), instance.getHost(), instance); for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { listener.onRemove(instance, ephSeqVersion); } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java index f1feec7a9e..81df272f98 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.app.rm.node.ExtendedNodeId; class ContainerFactory { final ApplicationAttemptId customAppAttemptId; @@ -37,13 +38,14 @@ public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) { } public Container createContainer(Resource capability, Priority priority, String hostname, - int port, String nodeHttpAddress) { + int port, String nodeHttpAddress, final String nodeIdentity) { ContainerId containerId = ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, port); + NodeId extendedNodeId = new ExtendedNodeId(nodeId, nodeIdentity); Container container = - Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null); + Container.newInstance(containerId, extendedNodeId, nodeHttpAddress, capability, priority, null); return container; } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 34aecfe52c..69a2b77e69 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -104,6 +104,7 @@ import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.impl.Edge; +import org.apache.tez.dag.app.rm.node.ExtendedNodeId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; @@ -844,11 +845,14 @@ public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) { @Override public void onRemove(LlapServiceInstance serviceInstance, int ephSeqVersion) { NodeReport nodeReport = constructNodeReport(serviceInstance, false); - LOG.info("Sending out nodeReport for onRemove: {}", nodeReport); + LOG.info("Sending out nodeReport for onRemove: {} workerIdentity: {}", nodeReport, serviceInstance.getWorkerIdentity()); getContext().nodesUpdated(Collections.singletonList(nodeReport)); instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); - LOG.info("Removed node with identity: {} due to RegistryNotification. currentActiveInstances={}", - serviceInstance.getWorkerIdentity(), activeInstances.size()); + Set activeInstancesInHost = activeInstances.getByHost(serviceInstance.getHost()); + final int activeInstancesInHostSize = activeInstancesInHost == null ? 0 : activeInstancesInHost.size(); + LOG.info("Removed node with identity: {} host: {} due to RegistryNotification. currentActiveInstances={} " + + "activeInstancesInHost: {}", serviceInstance.getWorkerIdentity(), serviceInstance.getHost(), + activeInstances.size(), activeInstancesInHostSize); if (metrics != null) { metrics.setClusterNodeCount(activeInstances.size()); } @@ -1594,8 +1598,10 @@ private void addNode(NodeInfo node, LlapServiceInstance serviceInstance) { metrics.setClusterNodeCount(activeInstances.size()); } // Trigger scheduling since a new node became available. - LOG.info("Adding new node: {}. TotalNodeCount={}. activeInstances.size={}", - node, instanceToNodeMap.size(), activeInstances.size()); + Set activeInstancesInHost = activeInstances.getByHost(node.getHost()); + final int activeInstancesInHostSize = activeInstancesInHost == null ? 0 : activeInstancesInHost.size(); + LOG.info("Adding new node: {}. TotalNodeCount={}. activeInstances.size={} activeInstancesInHost.size={}", + node, instanceToNodeMap.size(), activeInstances.size(), activeInstancesInHostSize); trySchedulingPendingTasks(); } @@ -1663,8 +1669,9 @@ private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) { private static NodeReport constructNodeReport(LlapServiceInstance serviceInstance, boolean healthy) { - NodeReport nodeReport = NodeReport.newInstance(NodeId - .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()), + NodeId nodeId = new ExtendedNodeId(NodeId + .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()), serviceInstance.getWorkerIdentity()); + NodeReport nodeReport = NodeReport.newInstance(nodeId, healthy ? NodeState.RUNNING : NodeState.LOST, serviceInstance.getServicesAddress(), null, null, null, 0, "", 0l); @@ -2021,7 +2028,8 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource, containerFactory.createContainer(nodeInfo.getResourcePerExecutor(), taskInfo.priority, nodeInfo.getHost(), nodeInfo.getRpcPort(), - nodeInfo.getServiceAddress()); + nodeInfo.getServiceAddress(), + nodeInfo.getNodeIdentity()); writeLock.lock(); // While updating local structures // Note: this is actually called under the epic writeLock in schedulePendingTasks try {