diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 24eed89..c01225b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1232,6 +1232,34 @@
public static final String DEFAULT_SCM_STORE_IMPL =
"org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
+ // Cleaner configs
+ private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
+ + "cleaner.";
+
+ /** The config key for the shared cache staleness criteria. */
+ public static final String SCM_CLEANER_STALENESS_MINUTES = SCM_CLEANER_PREFIX
+ + "staleness.minutes";
+ public static final int DEFAULT_SCM_CLEANER_STALENESS_MINUTES = 7 * 24 * 60;
+
+ /**
+ * The config key for how often the cleaner service for the shared cache runs.
+ */
+ public static final String SCM_CLEANER_CLEANING_PERIOD_MINUTES =
+ SCM_CLEANER_PREFIX + "period.minutes";
+ public static final int DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES = 24 * 60;
+
+ /** The config key for the initial delay in starting the cleaner service. */
+ public static final String SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES =
+ SCM_CLEANER_PREFIX + "initial.delay.minutes";
+ public static final int DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES =
+ 10;
+
+ /** The config key for sleep time between cleaning each directory. */
+ public static final String SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS =
+ SCM_CLEANER_PREFIX + "cleaner.sleep.between.clean.ms";
+ public static final long DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS =
+ 0L;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 70b405d..78ab1b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1280,13 +1280,42 @@
yarn.sharedcache.nested.level
3
-
+
The implementation to be used for the SCM store
yarn.sharedcache.manager.store.impl
org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore
+
+ The staleness criteria in days beyond which shared cache
+ entries may be deleted (minutes). It must be positive. It is 7 days by
+ default.
+ yarn.sharedcache.cleaner.staleness.minutes
+ 10080
+
+
+
+ How often the cleaner service for the shared cache runs
+ (minutes). It must be positive. It is 1 day by default.
+ yarn.sharedcache.cleaner.period.minutes
+ 1440
+
+
+
+ The initial delay in starting the cleaner service (minutes). It
+ must be non-negative.
+ yarn.sharedcache.cleaner.initial.delay.minutes
+ 10
+
+
+
+ Sleep time between cleaning each directory (ms). It must be
+ non-negative.
+ yarn.sharedcache.cleaner.sleep.between.clean.ms
+ 10
+
+
The interval that the yarn client library uses to poll the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
new file mode 100644
index 0000000..4fd5390
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cleaner service that maintains the shared cache area, and cleans up stale
+ * entries on a regular basis.
+ */
+public class CleanerService extends AbstractService {
+ /**
+ * Priority of the cleaner shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ /**
+ * The name of the global cleaner lock that the cleaner creates to indicate
+ * that a cleaning process is in progress.
+ */
+ public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
+
+ private static final Log LOG = LogFactory.getLog(CleanerService.class);
+
+ private Configuration conf;
+ private final AppChecker appChecker;
+ private CleanerMetrics metrics;
+ private ScheduledExecutorService scheduler;
+ private final SCMStore store;
+ private final SCMContext context;
+ private final AtomicBoolean cleanerTaskRunning;
+
+ public CleanerService(AppChecker appChecker, SCMStore store,
+ SCMContext context) {
+ super("CleanerService");
+ this.appChecker = appChecker;
+ this.store = store;
+ this.context = context;
+ this.cleanerTaskRunning = new AtomicBoolean();
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+ // create a single threaded scheduler service that services the cleaner task
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ if (!writeGlobalCleanerPidFile()) {
+ throw new YarnException("The global cleaner pid file already exists!");
+ }
+
+ long initialDelayInSeconds;
+ long periodInSeconds;
+
+ this.metrics = CleanerMetrics.initSingleton(conf);
+
+ int initialDelayInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES);
+ // negative value is invalid; use the default
+ if (initialDelayInMinutes < 0) {
+ LOG.warn("Negative initial delay value: " + initialDelayInMinutes +
+ ". The default will be used instead.");
+ initialDelayInMinutes =
+ YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES;
+ }
+ initialDelayInSeconds = TimeUnit.MINUTES.toSeconds(initialDelayInMinutes);
+ int periodInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_CLEANING_PERIOD_MINUTES,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES);
+ // non-positive value is invalid; use the default
+ if (periodInMinutes <= 0) {
+ LOG.warn("Non-positive period value: " + periodInMinutes +
+ ". The default will be used instead.");
+ periodInMinutes = YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES;
+ }
+ periodInSeconds = TimeUnit.MINUTES.toSeconds(periodInMinutes);
+
+ Runnable task = CleanerTask.create(conf, appChecker, store,
+ context, metrics, cleanerTaskRunning, true);
+ scheduler.scheduleAtFixedRate(task, initialDelayInSeconds, periodInSeconds,
+ TimeUnit.SECONDS);
+ LOG.info("Scheduled the shared cache cleaner task to run every " +
+ periodInSeconds + " seconds.");
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ LOG.info("Shutting down the background thread.");
+ scheduler.shutdownNow();
+ try {
+ if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Gave up waiting for the cleaner task to shutdown.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("The cleaner service was interrupted while shutting down the task.",
+ e);
+ }
+ LOG.info("The background thread stopped.");
+
+ removeGlobalCleanerPidFile();
+
+ super.serviceStop();
+ }
+
+ /**
+ * If no other cleaner task is running, execute an on-demand cleaner task.
+ *
+ * @return true if the cleaner task was started, false if there was already a
+ * cleaner task running.
+ */
+ protected boolean runCleanerTask() {
+
+ if (!this.cleanerTaskRunning.compareAndSet(false, true)) {
+ LOG.warn("A cleaner task is already running. "
+ + "A new on-demand cleaner task will not be submitted.");
+ return false;
+ }
+
+ Runnable task =
+ CleanerTask.create(conf, appChecker, store, context, metrics,
+ cleanerTaskRunning, false);
+ // this is a non-blocking call (it simply submits the task to the executor
+ // queue and returns)
+ this.scheduler.execute(task);
+ /*
+ * We return true if the task is accepted for execution by the executor. Two
+ * notes: 1. There is a small race here between a scheduled task and an
+ * on-demand task. If the scheduled task happens to start after we check/set
+ * cleanerTaskRunning, but before we call execute, we will get two tasks
+ * that run back to back. Luckily, since we have already set
+ * cleanerTaskRunning, the scheduled task will do nothing and the on-demand
+ * task will clean. 2. We do not need to set the cleanerTaskRunning boolean
+ * back to false because this will be done in the task itself.
+ */
+ return true;
+ }
+
+ /**
+ * To ensure there are not multiple instances of the SCM running on a given
+ * cluster, a global pid file is used. This file contains the hostname of the
+ * machine that owns the pid file.
+ *
+ * @return true if the pid file was written, false otherwise
+ * @throws IOException
+ */
+ private boolean writeGlobalCleanerPidFile() throws YarnException {
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+
+ if (fs.exists(pidPath)) {
+ return false;
+ }
+
+ FSDataOutputStream os = fs.create(pidPath, false);
+ // write the hostname and the process id in the global cleaner pid file
+ final String ID = ManagementFactory.getRuntimeMXBean().getName();
+ os.writeUTF(ID);
+ os.close();
+ // add it to the delete-on-exit to ensure it gets deleted when the JVM
+ // exits
+ fs.deleteOnExit(pidPath);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ LOG.info("Created the global cleaner pid file at " + pidPath.toString());
+ return true;
+ }
+
+ private void removeGlobalCleanerPidFile() {
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+
+
+ fs.delete(pidPath, false);
+ LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
+ } catch (IOException e) {
+ LOG.error(
+ "Unable to remove the global cleaner pid file! The file may need "
+ + "to be removed manually.", e);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
new file mode 100644
index 0000000..7704dda
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.ResourceReference;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The task that runs and cleans up the shared cache area for stale entries and
+ * orphaned files. It is expected that only one cleaner task runs at any given
+ * point in time.
+ */
+class CleanerTask implements Runnable {
+ public static final String RENAMED_SUFFIX = "-renamed";
+ private static final Log LOG = LogFactory.getLog(CleanerTask.class);
+
+ private final String location;
+ private final int stalenessMinutes;
+ private final long sleepTime;
+ private final int nestedLevel;
+ private final Path root;
+ private final FileSystem fs;
+ private final AppChecker appChecker;
+ private final SCMStore store;
+ private final SCMContext context;
+ private final CleanerMetrics metrics;
+ private final AtomicBoolean cleanerTaskIsRunning;
+ private final boolean isScheduledTask;
+
+ /**
+ * Creates a cleaner task based on the configuration. This is provided for
+ * convenience.
+ *
+ * @param conf
+ * @param appChecker
+ * @param store
+ * @param context
+ * @param metrics
+ * @param cleanerTaskRunning true if there is another cleaner task currently
+ * running
+ * @param isScheduledTask true if the task is a scheduled task
+ */
+ public static CleanerTask create(Configuration conf, AppChecker appChecker,
+ SCMStore store, SCMContext context, CleanerMetrics metrics,
+ AtomicBoolean cleanerTaskRunning, boolean isScheduledTask) {
+ try {
+ // get the root directory for the shared cache
+ String location =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ int stalenessMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_STALENESS_MINUTES,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES);
+ // non-positive value is invalid; use the default
+ if (stalenessMinutes <= 0) {
+ LOG.warn("Non-positive staleness value: " + stalenessMinutes +
+ ". The default will be used instead.");
+ stalenessMinutes = YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES;
+ }
+ long sleepTime =
+ conf.getLong(YarnConfiguration.SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS);
+ int nestedLevel = CacheStructureUtil.getCacheDepth(conf);
+ FileSystem fs = FileSystem.get(conf);
+
+ return new CleanerTask(location, stalenessMinutes, sleepTime,
+ nestedLevel, fs, appChecker, store, context, metrics,
+ cleanerTaskRunning, isScheduledTask);
+ } catch (IOException e) {
+ LOG.error("Unable to obtain the filesystem for the cleaner service", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Creates a cleaner task based on the root directory location and the
+ * filesystem.
+ */
+ CleanerTask(String location, int stalenessMinutes, long sleepTime,
+ int nestedLevel, FileSystem fs, AppChecker appChecker, SCMStore store,
+ SCMContext context, CleanerMetrics metrics,
+ AtomicBoolean cleanerTaskIsRunning, boolean isScheduledTask) {
+ this.location = location;
+ this.stalenessMinutes = stalenessMinutes;
+ this.sleepTime = sleepTime;
+ this.nestedLevel = nestedLevel;
+ this.root = new Path(location);
+ this.fs = fs;
+ this.store = store;
+ this.context = context;
+ this.appChecker = appChecker;
+ this.metrics = metrics;
+ this.cleanerTaskIsRunning = cleanerTaskIsRunning;
+ this.isScheduledTask = isScheduledTask;
+ }
+
+ public void run() {
+ // check if it is a scheduled task
+ if (isScheduledTask
+ && !this.cleanerTaskIsRunning.compareAndSet(false, true)) {
+ // this is a scheduled task and there is already another task running
+ LOG.warn("A cleaner task is already running. "
+ + "This scheduled cleaner task will do nothing.");
+ return;
+ }
+
+ try {
+ if (!fs.exists(root)) {
+ LOG.error("The shared cache root " + location + " was not found. "
+ + "The cleaner task will do nothing.");
+ return;
+ }
+
+ // if we are using the in-memory store, we need to ensure the initial list
+ // of active apps is exhausted before the cleaner task can really process
+ // the entries because we do not remember whether any of the active apps
+ // may have been using the shared cache entries this check is needed
+ // specifically for the default in-memory store
+ if (store instanceof InMemorySCMStore && !processInitialActiveApps()) {
+ return;
+ }
+
+ // we're now ready to process the shared cache area
+ process();
+ } catch (IOException e) {
+ LOG.error("Unexpected exception while initializing the cleaner task. "
+ + "This task will do nothing,", e);
+ } finally {
+ // this is set to false regardless of if it is a scheduled or on-demand
+ // task
+ this.cleanerTaskIsRunning.set(false);
+ }
+ }
+
+ /**
+ * Goes through the list of the apps, and removes apps that are no longer
+ * active.
+ *
+ * @return true if the list is null or empty; false otherwise
+ */
+ @VisibleForTesting
+ boolean processInitialActiveApps() {
+ Collection activeApps = context.getInitialActiveApps();
+ if (activeApps == null || activeApps.isEmpty()) {
+ // we're fine; there are no active apps that were running at the time of
+ // the service start
+ return true;
+ }
+ LOG.info("Looking into " + activeApps.size() +
+ " apps to see if they are still active");
+ Iterator it = activeApps.iterator();
+ while (it.hasNext()) {
+ ApplicationId id = it.next();
+ try {
+ if (!appChecker.appIsActive(id)) {
+ // remove it from the list
+ it.remove();
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while checking the app status; will leave the entry in",
+ e);
+ // continue
+ }
+ }
+ LOG.info("There are now " + activeApps.size() + " entries in the list");
+ return activeApps.isEmpty();
+ }
+
+ /**
+ * Sweeps and processes the shared cache area to clean up stale and orphaned
+ * files.
+ */
+ void process() {
+ // mark the beginning of the run in the metrics
+ metrics.reportCleaningStart();
+ try {
+ // now traverse individual directories and process them
+ // the directory structure is specified by the nested level parameter
+ // (e.g. 9/c/d/)
+ StringBuilder pattern = new StringBuilder();
+ for (int i = 0; i < nestedLevel; i++) {
+ pattern.append("*/");
+ }
+ pattern.append("*");
+ FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+ int numEntries = entries == null ? 0 : entries.length;
+ LOG.info("Processing " + numEntries + " entries");
+ long beginNano = System.nanoTime();
+ if (entries != null) {
+ for (FileStatus entry: entries) {
+ // check for interruption so it can abort in a timely manner in case
+ // of shutdown
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("The cleaner task was interrupted. Aborting.");
+ break;
+ }
+
+ if (entry.isDirectory()) {
+ processSingleEntry(entry);
+ } else {
+ LOG.warn("Invalid file at path " + entry.getPath().toString() +
+ " when a directory was expected");
+ }
+ // add sleep time between cleaning each directory if it is non-zero
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ }
+ }
+ long endNano = System.nanoTime();
+ long durationMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
+ LOG.info("Processed " + numEntries + " entries in " + durationMs +
+ " ms.");
+ } catch (IOException e1) {
+ LOG.error("Unable to complete the cleaner task", e1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt(); // restore the interrupt
+ }
+ }
+
+ /**
+ * Returns a path for the root directory for the shared cache.
+ */
+ Path getRootPath() {
+ return root;
+ }
+
+ /**
+ * Processes a single shared cache entry directory.
+ */
+ void processSingleEntry(FileStatus entry) {
+ Path path = entry.getPath();
+ // indicates the processing status of the entry
+ TaskStatus taskStatus = TaskStatus.INIT;
+
+ // first, if the path ends with the renamed suffix, it indicates the
+ // directory was moved (as stale) but somehow not deleted (probably due to
+ // SCM failure); delete the directory
+ if (path.toString().endsWith(RENAMED_SUFFIX)) {
+ LOG.info("Found a renamed directory that was left undeleted at " +
+ path.toString() + ". Deleting.");
+ try {
+ if (fs.delete(path, true)) {
+ taskStatus = TaskStatus.DELETED;
+ }
+ } catch (IOException e) {
+ LOG.error("Error while processing an entry: " + path, e);
+ }
+ } else {
+ // this is the path to the cache entry directory
+ // the directory name is the cache entry key (i.e. checksum)
+ String key = path.getName();
+
+ try {
+ cleanDeadAppIds(key);
+ } catch (IOException e) {
+ LOG.error("Exception thrown while removing dead appIds.", e);
+ }
+
+ if (entryIsStale(key, entry)) {
+ try {
+ /*
+ * As a side note: store.removeKey(key) and
+ * removeEntryFromCacheFileSystem(path) do not have to be in a
+ * critical section. We will never receive a notify for this entry
+ * before the delete from the FS has happened because the rename on
+ * the node manager will fail. If the node manager uploads the file
+ * after it is deleted form the FS, we are ok and the key will simply
+ * get added back to the scm once a notification is received.
+ */
+ // remove the entry from scm
+ if (store.removeKey(key)) {
+ // remove the entry from the file system
+ boolean deleted = removeEntryFromCacheFileSystem(path);
+ if (deleted) {
+ taskStatus = TaskStatus.DELETED;
+ } else {
+ LOG.error("Failed to remove path from the file system."
+ + " Skipping this entry: " + path);
+ taskStatus = TaskStatus.ERROR;
+ }
+ } else {
+ // we did not delete the entry because it contained application ids
+ taskStatus = TaskStatus.PROCESSED;
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to remove path from the file system. Skipping this entry: "
+ + path, e);
+ taskStatus = TaskStatus.ERROR;
+ }
+ } else {
+ taskStatus = TaskStatus.PROCESSED;
+ }
+ }
+
+ // record the processing
+ switch (taskStatus) {
+ case DELETED:
+ metrics.reportAFileDelete();
+ break;
+ case PROCESSED:
+ metrics.reportAFileProcess();
+ break;
+ case ERROR:
+ // TODO add metric for reporting errors in processing an entry
+ break;
+ default:
+ LOG.error(
+ "Cleaner task ended with an invalid task status: " + taskStatus);
+ }
+ }
+
+ boolean entryIsStale(String key, FileStatus entry) {
+ long staleTime =
+ System.currentTimeMillis()
+ - TimeUnit.MINUTES.toMillis(stalenessMinutes);
+ long accessTime = store.getAccessTime(key);
+ if (accessTime == -1) {
+ // check modification time
+ long modTime = entry.getModificationTime();
+ // if modification time is older then SCM startup time, we need to just
+ // use SCM startup time as the last point of certainty
+ long lastUse =
+ modTime < context.getStartTime() ? context.getStartTime() : modTime;
+ return lastUse < staleTime;
+ } else {
+ // check access time
+ return accessTime < staleTime;
+ }
+ }
+
+ private boolean removeEntryFromCacheFileSystem(Path path) throws IOException {
+ // rename the directory to make the delete atomic
+ Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
+ if (fs.rename(path, renamedPath)) {
+ // the directory can be removed safely now
+ // log the original path
+ LOG.info("Deleting " + path.toString());
+ return fs.delete(renamedPath, true);
+ } else {
+ // we were unable to remove it for some reason: it's best to leave
+ // it at that
+ LOG.error("We were not able to rename the directory to "
+ + renamedPath.toString() + ". We will leave it intact.");
+ }
+ return false;
+ }
+
+ /**
+ * Delete all appIds for apps that are not in an end state (regardless of
+ * whether the entry is stale or not). If key or appId does not exist in scm
+ * do nothing.
+ *
+ * @param key checksum of entry
+ * @throws IOException
+ */
+ private void cleanDeadAppIds(String key) throws IOException {
+ Collection refs = store.getResourceReferences(key);
+ if (refs != null) {
+ Set refsToRemove = new HashSet();
+ for (ResourceReference r : refs) {
+ if (!appChecker.appIsActive(r.getAppId())) {
+ // app is dead
+ refsToRemove.add(r);
+ }
+ }
+ if (refsToRemove.size() > 0) {
+ store.removeResourceRefs(key, refsToRemove, false);
+ }
+ }
+ }
+
+ /**
+ * A status indicating what happened with the processing of a given cache
+ * entry.
+ */
+ private enum TaskStatus {
+ INIT,
+ /** Entry was successfully processed, but not deleted **/
+ PROCESSED,
+ /** Entry was successfully deleted **/
+ DELETED,
+ /** The cleaner task ran into an error while processing the entry **/
+ ERROR;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index fde4935..4c733aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -77,6 +77,9 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
this.store = createSCMStoreService(conf, context);
addService(store);
+ CleanerService cs = createCleanerService(appChecker, store, context);
+ addService(cs);
+
} catch (IOException e) {
LOG.error("Encountered unexpected exception while initializing the shared cache manager",
e);
@@ -184,6 +187,11 @@ private static SCMStore createSCMStoreService(Configuration conf,
return store;
}
+ private CleanerService createCleanerService(AppChecker appChecker,
+ SCMStore store, SCMContext context) {
+ return new CleanerService(appChecker, store, context);
+ }
+
@Override
protected synchronized void serviceStart() throws Exception {
// Start metrics
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
new file mode 100644
index 0000000..166d46a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining the various Cleaner activity statistics and
+ * publishing them through the metrics interfaces.
+ */
+@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn")
+public class CleanerMetrics {
+ public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
+ private final MetricsRegistry registry = new MetricsRegistry("cleaner");
+
+ enum Singleton {
+ INSTANCE;
+
+ CleanerMetrics impl;
+
+ synchronized CleanerMetrics init(Configuration conf) {
+ if (impl == null) {
+ impl = create(conf);
+ }
+ return impl;
+ }
+ }
+
+ public static CleanerMetrics initSingleton(Configuration conf) {
+ return Singleton.INSTANCE.init(conf);
+ }
+
+ public static CleanerMetrics getInstance() {
+ CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
+ if (topMetrics == null)
+ throw new IllegalStateException(
+ "The CleanerMetics singlton instance is not initialized."
+ + " Have you called init first?");
+ return topMetrics;
+ }
+
+ @Metric("number of deleted files over all runs")
+ private MutableCounterLong totalDeletedFiles;
+
+ public long getTotalDeletedFiles() {
+ return totalDeletedFiles.value();
+ }
+
+ private @Metric("number of deleted files in the last run")
+ MutableGaugeLong deletedFiles;
+
+ public long getDeletedFiles() {
+ return deletedFiles.value();
+ }
+
+ private @Metric("number of processed files over all runs")
+ MutableCounterLong totalProcessedFiles;
+
+ public long getTotalProcessedFiles() {
+ return totalProcessedFiles.value();
+ }
+
+ private @Metric("number of processed files in the last run")
+ MutableGaugeLong processedFiles;
+
+ public long getProcessedFiles() {
+ return processedFiles.value();
+ }
+
+ private @Metric("Rate of deleting the files over all runs")
+ MutableGaugeInt totalDeleteRate;
+
+ public int getTotalDeleteRate() {
+ return totalDeleteRate.value();
+ }
+
+ private @Metric("Rate of deleting the files over the last run")
+ MutableGaugeInt deleteRate;
+
+ public int getDeleteRate() {
+ return deleteRate.value();
+ }
+
+ private @Metric("Rate of prcessing the files over all runs")
+ MutableGaugeInt totalProcessRate;
+
+ public int getTotalProcessRate() {
+ return totalProcessRate.value();
+ }
+
+ private @Metric("Rate of prcessing the files over the last run")
+ MutableGaugeInt processRate;
+
+ public int getProcessRate() {
+ return processRate.value();
+ }
+
+ CleanerMetrics() {
+ }
+
+ /**
+ * The metric source obtained after parsing the annotations
+ */
+ MetricsSource metricSource;
+
+ /**
+ * The start of the last run of cleaner is ms
+ */
+ private AtomicLong lastRunStart = new AtomicLong(System.currentTimeMillis());
+
+ /**
+ * The end of the last run of cleaner is ms
+ */
+ private AtomicLong lastRunEnd = new AtomicLong(System.currentTimeMillis());
+
+ /**
+ * The sum of the durations of the last runs
+ */
+ private AtomicLong sumOfPastPeriods = new AtomicLong(0);
+
+ public MetricsSource getMetricSource() {
+ return metricSource;
+ }
+
+ static CleanerMetrics create(Configuration conf) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+
+ CleanerMetrics metricObject = new CleanerMetrics();
+ MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject);
+ final MetricsSource s = sb.build();
+ ms.register("cleaner", "The cleaner service of truly shared cache", s);
+ metricObject.metricSource = s;
+ return metricObject;
+ }
+
+ /**
+ * Report a delete operation at the current system time
+ */
+ public void reportAFileDelete() {
+ long time = System.currentTimeMillis();
+ reportAFileDelete(time);
+ }
+
+ /**
+ * Report a delete operation at the specified time.
+ * Delete implies process as well.
+ * @param time
+ */
+ public void reportAFileDelete(long time) {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ totalDeletedFiles.incr();
+ deletedFiles.incr();
+ updateRates(time);
+ lastRunEnd.set(time);
+ }
+
+ /**
+ * Report a process operation at the current system time
+ */
+ public void reportAFileProcess() {
+ long time = System.currentTimeMillis();
+ reportAFileProcess(time);
+ }
+
+ /**
+ * Report a process operation at the specified time
+ *
+ * @param time
+ */
+ public void reportAFileProcess(long time) {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ updateRates(time);
+ lastRunEnd.set(time);
+ }
+
+ private void updateRates(long time) {
+ long startTime = lastRunStart.get();
+ long lastPeriod = time - startTime;
+ long sumPeriods = sumOfPastPeriods.get() + lastPeriod;
+ float lastRunProcessRate = ((float) processedFiles.value()) / lastPeriod;
+ processRate.set(ratePerMsToHour(lastRunProcessRate));
+ float totalProcessRateMs = ((float) totalProcessedFiles.value()) / sumPeriods;
+ totalProcessRate.set(ratePerMsToHour(totalProcessRateMs));
+ float lastRunDeleteRate = ((float) deletedFiles.value()) / lastPeriod;
+ deleteRate.set(ratePerMsToHour(lastRunDeleteRate));
+ float totalDeleteRateMs = ((float) totalDeletedFiles.value()) / sumPeriods;
+ totalDeleteRate.set(ratePerMsToHour(totalDeleteRateMs));
+ }
+
+ /**
+ * Report the start a new run of the cleaner at the current system time
+ */
+ public void reportCleaningStart() {
+ long time = System.currentTimeMillis();
+ reportCleaningStart(time);
+ }
+
+ /**
+ * Report the start a new run of the cleaner at the specified time
+ *
+ * @param time
+ */
+ public void reportCleaningStart(long time) {
+ long lastPeriod = lastRunEnd.get() - lastRunStart.get();
+ if (lastPeriod < 0) {
+ LOG.error("No operation since last start!");
+ lastPeriod = 0;
+ }
+ lastRunStart.set(time);
+ processedFiles.set(0);
+ deletedFiles.set(0);
+ processRate.set(0);
+ deleteRate.set(0);
+ sumOfPastPeriods.addAndGet(lastPeriod);
+ registry.tag(SessionId, Long.toString(time), true);
+ }
+
+ static int ratePerMsToHour(float ratePerMs) {
+ float ratePerHour = ratePerMs * 1000 * 3600;
+ return (int) ratePerHour;
+ }
+
+ public static boolean isCleanerMetricRecord(String name) {
+ return (name.startsWith("cleaner"));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java
new file mode 100644
index 0000000..84672e7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+
+/**
+ * Very simple metric collector for the cleaner service
+ */
+public class CleanerMetricsCollector implements MetricsCollector {
+
+ /**
+ * A map from reporting period to the list of reported metrics in that period
+ */
+ Map metrics = new HashMap();
+ String sessionId = null;
+
+ @Override
+ public MetricsRecordBuilder addRecord(String name) {
+ // For the cleaner service we need to record only the metrics destined for the
+ // cleaner service. We hence ignore the others.
+ if (CleanerMetrics.isCleanerMetricRecord(name)) {
+ return new CleanerMetricsRecordBuilder(metrics);
+ }
+ else
+ return new NullMetricsRecordBuilder();
+ }
+
+ @Override
+ public MetricsRecordBuilder addRecord(MetricsInfo info) {
+ return addRecord(info.name());
+ }
+
+ public Map getMetrics() {
+ return metrics;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ /**
+ * A builder which ignores all the added metrics, tags, etc.
+ */
+ class NullMetricsRecordBuilder extends MetricsRecordBuilder {
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ return this;
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return CleanerMetricsCollector.this;
+ }
+ }
+
+ /**
+ * A builder which keeps track of only counters and gauges
+ */
+ class CleanerMetricsRecordBuilder extends NullMetricsRecordBuilder {
+ Map metricValueMap;
+
+ public CleanerMetricsRecordBuilder(Map metricValueMap) {
+ this.metricValueMap = metricValueMap;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ if (MsInfo.SessionId.equals(info))
+ setSessionId(value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ if (MsInfo.SessionId.equals(tag.info()))
+ setSessionId(tag.value());
+ return this;
+ }
+
+ private void setSessionId(String value) {
+ sessionId = value;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ metricValueMap.put(metric.name(), metric.value());
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ metricValueMap.put(info.name(), value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return CleanerMetricsCollector.this;
+ }
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
new file mode 100644
index 0000000..6bffcc6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.junit.Test;
+
+public class TestCleanerTask {
+ private static final String ROOT =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
+ private static final int STALENESS_MINUTES =
+ YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES;
+ private static final long SLEEP_TIME =
+ YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS;
+ private static final int NESTED_LEVEL =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+
+ @Test
+ public void testNonExistentRoot() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+ SCMContext context = mock(SCMContext.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+ // the shared cache root does not exist
+ when(fs.exists(task.getRootPath())).thenReturn(false);
+
+ task.run();
+
+ // process() should not be called
+ verify(task, never()).process();
+ }
+
+ @Test
+ public void testProcessFreshEntry() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+ SCMContext context = mock(SCMContext.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+
+ // mock an entry whose modification timestamp is fresh
+ long staleModificationTime =
+ System.currentTimeMillis() -
+ TimeUnit.MINUTES.toMillis(STALENESS_MINUTES - 1);
+ FileStatus entry = mock(FileStatus.class);
+ when(fs.exists(new Path(ROOT))).thenReturn(true);
+ when(entry.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ when(entry.getModificationTime()).thenReturn(staleModificationTime);
+ when(store.getAccessTime(isA(String.class))).thenReturn(-1L);
+
+ // process the entry
+ task.processSingleEntry(entry);
+
+ // the directory should not be renamed
+ verify(fs, never()).rename(eq(entry.getPath()), isA(Path.class));
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+
+ @Test
+ public void testProcessStaleEntry() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ when(appChecker.appIsActive(isA(ApplicationId.class))).thenReturn(false);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+ SCMContext context = mock(SCMContext.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+
+ // mock an entry whose modification timestamp is stale
+ long staleModificationTime =
+ System.currentTimeMillis() -
+ TimeUnit.MINUTES.toMillis(STALENESS_MINUTES + 1);
+ FileStatus entry = mock(FileStatus.class);
+ String file = ROOT + "/a/b/c/abc";
+ when(entry.getPath()).thenReturn(new Path(file));
+ when(entry.getModificationTime()).thenReturn(staleModificationTime);
+ when(store.getAccessTime(isA(String.class))).thenReturn(-1L);
+ // mock the task to determine that this entry is in use
+ doReturn(true).when(task).entryIsStale(isA(String.class),
+ isA(FileStatus.class));
+ when(store.removeKey(isA(String.class))).thenReturn(true);
+ // rename succeeds
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ // delete returns true
+ when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
+
+ // process the entry
+ task.processSingleEntry(entry);
+
+ // the directory should be renamed
+ verify(fs).rename(eq(entry.getPath()), isA(Path.class));
+ // metrics should record a deleted file
+ verify(metrics).reportAFileDelete();
+ verify(metrics, never()).reportAFileProcess();
+ }
+
+ private CleanerTask createSpiedTask(FileSystem fs, AppChecker appChecker,
+ SCMStore store, SCMContext context, CleanerMetrics metrics,
+ AtomicBoolean isCleanerRunning) {
+ return spy(new CleanerTask(ROOT, STALENESS_MINUTES, SLEEP_TIME,
+ NESTED_LEVEL, fs, appChecker, store, context, metrics,
+ isCleanerRunning, true));
+ }
+
+ @Test
+ public void testEntryIsInUseEntryIsRefreshed() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+ SCMContext context = mock(SCMContext.class);
+
+ // have store say the entry does not exist
+ when(store.getAccessTime(isA(String.class))).thenReturn(-1L);
+ // have the filesystem return a fresh modification timestamp
+ FileStatus status = mock(FileStatus.class);
+ when(status.getModificationTime()).thenReturn(System.currentTimeMillis());
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+
+ // call entryIsInUse()
+ assertFalse(task.entryIsStale("fooKey", status));
+ }
+
+ @Test
+ public void testEntryIsInUseHasAnActiveApp() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+ SCMContext context = mock(SCMContext.class);
+
+ long staleModificationTime =
+ System.currentTimeMillis() -
+ TimeUnit.MINUTES.toMillis(STALENESS_MINUTES + 1);
+ FileStatus entry = mock(FileStatus.class);
+ when(entry.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ when(entry.getModificationTime()).thenReturn(staleModificationTime);
+ when(store.getAccessTime(isA(String.class))).thenReturn(-1L);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+
+ when(store.removeKey(isA(String.class))).thenReturn(false);
+
+ // process the entry
+ task.processSingleEntry(entry);
+
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+
+ @Test
+ public void testProcessInitialActiveApps() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+
+ // prepare an initial active app list
+ List ids = new ArrayList();
+ // app id for which app checker will return active
+ ApplicationId id1 = mock(ApplicationId.class);
+ when(appChecker.appIsActive(id1)).thenReturn(true);
+ ids.add(id1);
+ // app id for which app checker will return inactive
+ ApplicationId id2 = mock(ApplicationId.class);
+ when(appChecker.appIsActive(id2)).thenReturn(false);
+ ids.add(id2);
+ SCMContext context = mock(SCMContext.class);
+ when(context.getInitialActiveApps()).thenReturn(ids);
+ // create the store
+ SCMStore store = new InMemorySCMStore(context);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, context, metrics,
+ new AtomicBoolean());
+ when(fs.exists(task.getRootPath())).thenReturn(true);
+
+ task.run();
+
+ // process() should not be called
+ verify(task, never()).process();
+ // the initial active app list should be down to 1
+ assertSame(1, ids.size());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
new file mode 100644
index 0000000..ddada63
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCleanerMetrics {
+
+ Configuration conf = new Configuration();
+ long currTime = System.currentTimeMillis();
+ // it is float to allow floating point rate calculation
+ float lastDuration = 0;
+ float totalDurations = 0;
+ CleanerMetrics cleanerMetrics;
+
+ @Before
+ public void init() {
+ CleanerMetrics.initSingleton(conf);
+ cleanerMetrics = CleanerMetrics.getInstance();
+ }
+
+ @Test
+ public void testMetricsOverMultiplePeriods() {
+ simulateACleanerRun();
+ assertMetrics(4, 4, 1, 1);
+ currTime += 1300;
+ simulateACleanerRun();
+ assertMetrics(4, 8, 1, 2);
+ }
+
+ public void simulateACleanerRun() {
+ long startTime = currTime;
+ cleanerMetrics.reportCleaningStart(currTime);
+ currTime += 9;
+ cleanerMetrics.reportAFileProcess(currTime);
+ cleanerMetrics.reportAFileDelete(currTime);
+ currTime++;
+ cleanerMetrics.reportAFileProcess(currTime);
+ cleanerMetrics.reportAFileProcess(currTime);
+ lastDuration = currTime - startTime;
+ totalDurations += lastDuration;
+ }
+
+ void assertMetrics(int proc, int totalProc, int del, int totalDel) {
+ Assert.assertEquals(
+ "Processed files in the last period are not measured correctly", proc,
+ cleanerMetrics.getProcessedFiles());
+ Assert.assertEquals("Total processed files are not measured correctly",
+ totalProc, cleanerMetrics.getTotalProcessedFiles());
+ Assert.assertEquals(
+ "Deleted files in the last period are not measured correctly", del,
+ cleanerMetrics.getDeletedFiles());
+ Assert.assertEquals("Total deleted files are not measured correctly",
+ totalDel, cleanerMetrics.getTotalDeletedFiles());
+
+ Assert
+ .assertEquals(
+ "Rate of processed files in the last period are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(proc / lastDuration),
+ cleanerMetrics.getProcessRate());
+ Assert.assertEquals(
+ "Rate of total processed files are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(totalProc / totalDurations),
+ cleanerMetrics.getTotalProcessRate());
+ Assert.assertEquals(
+ "Rate of deleted files in the last period are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(del / lastDuration),
+ cleanerMetrics.getDeleteRate());
+ Assert.assertEquals(
+ "Rate of total deleted files are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(totalDel / totalDurations),
+ cleanerMetrics.getTotalDeleteRate());
+ }
+}