diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index ad24c62..983d90f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -71,6 +72,7 @@ final ConcurrentMap localrsrc; private Configuration conf; private LocalDirsHandlerService dirsHandler; + private DeletionService delService; /* * This flag controls whether this resource tracker uses hierarchical * directories or not. For PRIVATE and PUBLIC resource trackers it @@ -103,6 +105,16 @@ public LocalResourcesTrackerImpl(String user, ApplicationId appId, useLocalCacheDirectoryManager, conf, stateStore, dirHandler); } + public LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, + Configuration conf, NMStateStoreService stateStore, + LocalDirsHandlerService dirHandler,DeletionService delService) { + this(user, appId, dispatcher, + new ConcurrentHashMap(), + useLocalCacheDirectoryManager, conf, stateStore, dirHandler); + this.delService = delService; + } + LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, ConcurrentMap localrsrc, @@ -183,6 +195,12 @@ public synchronized void handle(ResourceEvent event) { if (rsrc == null) { LOG.warn("Received " + event.getType() + " event for request " + req + " but localized resource is missing"); + if (event.getType() == ResourceEventType.LOCALIZED && delService != null) { + ResourceLocalizedEvent localizedEvent = (ResourceLocalizedEvent) event; + FileDeletionTask deletionTask = new FileDeletionTask(delService, + getUser(), getPathToDelete(localizedEvent.getLocation()), null); + delService.delete(deletionTask); + } return; } rsrc.handle(event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d9b887f..1efdba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -247,7 +247,7 @@ private void validateConf(Configuration conf) { public void serviceInit(Configuration conf) throws Exception { this.validateConf(conf); this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher, - true, conf, stateStore, dirsHandler); + true, conf, stateStore, dirsHandler, delService); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { @@ -311,7 +311,7 @@ public void recoverLocalizedResources(RecoveredLocalizationState state) if (!trackerState.isEmpty()) { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, true, super.getConfig(), stateStore, - dirsHandler); + dirsHandler, delService); LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { @@ -467,11 +467,12 @@ private void handleInitApplicationResources(Application app) { // 0) Create application tracking structs String userName = app.getUser(); privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler)); + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler, + delService)); String appIdStr = app.getAppId().toString(); appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(), app.getAppId(), dispatcher, false, super.getConfig(), stateStore, - dirsHandler)); + dirsHandler, delService)); // 1) Signal container init // // This is handled by the ApplicationImpl state machine and allows