diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 574e091..18518c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -602,11 +602,14 @@ public ContainerLaunchContext getLaunchContext() { public ContainerStatus cloneAndGetContainerStatus() { this.readLock.lock(); try { - ContainerStatus status = BuilderUtils.newContainerStatus(this.containerId, - getCurrentState(), diagnostics.toString(), exitCode, getResource(), - this.containerTokenIdentifier.getExecutionType()); + ContainerStatus status = BuilderUtils + .newContainerStatus(this.containerId, getCurrentState(), + diagnostics.toString(), exitCode, getResource(), + this.containerTokenIdentifier.getExecutionType()); status.setIPs(ips == null ? null : Arrays.asList(ips.split(","))); status.setHost(host); + status + .setLocalizationStatus(this.resourceSet.getLocalizationStatusList()); return status; } finally { this.readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java index d23bf76..e0cc573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -23,18 +23,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.util.Records; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + /** * All Resources requested by the container. */ @@ -125,20 +130,24 @@ */ public Set resourceLocalized(LocalResourceRequest request, Path location) { - Set symlinks = pendingResources.remove(request); - if (symlinks == null) { - return null; - } else { - for (String symlink : symlinks) { - localizedResources.put(symlink, location); + synchronized (pendingResources) { + Set symlinks = pendingResources.remove(request); + if (symlinks == null) { + return null; + } else { + for (String symlink : symlinks) { + localizedResources.put(symlink, location); + } + return symlinks; } - return symlinks; } } public void resourceLocalizationFailed(LocalResourceRequest request) { - pendingResources.remove(request); - resourcesFailedToBeLocalized.add(request); + synchronized (pendingResources) { + pendingResources.remove(request); + resourcesFailedToBeLocalized.add(request); + } } public synchronized Map getLocalizationStatusList() { + + Map> allResource = + getAllResourcesByVisibility(); + + List localizationStatusList = + new ArrayList(); + + try { + if (allResource != null && !allResource.isEmpty()) { + Iterator>> + entries = allResource.entrySet().iterator(); + while (entries.hasNext()) { + Map.Entry> + entry = entries.next(); + + Collection tmpResourceList = entry.getValue(); + List tmpLocalizationStatusList = + new ArrayList(); + for (LocalResourceRequest req : tmpResourceList) { + LocalizationState state = LocalizationState.COMPLETED; + synchronized (pendingResources) { + state = (pendingResources.containsKey(req) ? + LocalizationState.PENDING : + resourcesFailedToBeLocalized.contains(req) ? + LocalizationState.FAILED : + LocalizationState.COMPLETED); + } + LocalizationStatus localizationStatus = + Records.newRecord(LocalizationStatus.class); + localizationStatus.setLocalizationState(state); + localizationStatus.setLocalResource(LocalResource + .newInstance(req.getResource(), req.getType(), + req.getVisibility(), req.getSize(), req.getTimestamp())); + tmpLocalizationStatusList.add(localizationStatus); + + } + localizationStatusList.addAll(tmpLocalizationStatusList); + } + + } + } catch(Exception e) { + + LOG.error(e); + + } + return localizationStatusList; + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index c973911..fcc3047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -192,6 +192,7 @@ public void setup() throws IOException { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + // Default delSrvc exec = createContainerExecutor(); delSrvc = createDeletionService(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 8e7b628..1cf3641 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; @@ -955,6 +957,8 @@ public void testLocalingResourceWhileContainerRunning() throws Exception { // Localize new local resources while container is running Map localResource2 = setupLocalResources("file2", "symLink2"); + Thread.sleep(3000); + ResourceLocalizationRequest request = ResourceLocalizationRequest.newInstance(cId, localResource2); @@ -964,6 +968,19 @@ public void testLocalingResourceWhileContainerRunning() throws Exception { GenericTestUtils.waitFor(new Supplier() { public Boolean get() { try { + List list = new ArrayList(); + list.add(cId); + GetContainerStatusesRequest request = + GetContainerStatusesRequest.newInstance(list); + GetContainerStatusesResponse response = + containerManager.getContainerStatuses(request); + ContainerStatus status = response.getContainerStatuses().get(0); + + List ls = status.getLocalizationStatus(); + for (LocalizationStatus l : ls) { + LocalizationState state = l.getLocalizationState(); + Assert.assertEquals(state, LocalizationState.COMPLETED); + } checkResourceLocalized(cId, "symLink2"); return true;