diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f183a90..43c6325 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1374,6 +1374,34 @@
public static final String IN_MEMORY_CHECK_PERIOD =
IN_MEMORY_STORE_PREFIX + "check-period";
public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+
+ // SCM Cleaner service configuration
+
+ private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
+ + "cleaner.";
+
+ /**
+ * The frequency at which a cleaner task runs. Specified in minutes.
+ */
+ public static final String SCM_CLEANER_PERIOD =
+ SCM_CLEANER_PREFIX + "period";
+ public static final int DEFAULT_SCM_CLEANER_PERIOD = 24 * 60;
+
+ /**
+ * Initial delay before the first cleaner task is scheduled. Specified in
+ * minutes.
+ */
+ public static final String SCM_CLEANER_INITIAL_DELAY =
+ SCM_CLEANER_PREFIX + "initial-delay";
+ public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY = 10;
+
+ /**
+ * The time to sleep between processing each shared cache resource. Specified
+ * in milliseconds.
+ */
+ public static final String SCM_CLEANER_RESOURCE_SLEEP =
+ SCM_CLEANER_PREFIX + "resource-sleep";
+ public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP = 0L;
////////////////////////////////
// Other Configs
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5d53191..c24e731 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1376,6 +1376,27 @@
720
+
+ The frequency at which a cleaner task runs.
+ Specified in minutes.
+ yarn.sharedcache.cleaner.period
+ 1440
+
+
+
+ Initial delay before the first cleaner task is scheduled.
+ Specified in minutes.
+ yarn.sharedcache.cleaner.initial-delay
+ 10
+
+
+
+ The time to sleep between processing each shared cache
+ resource. Specified in milliseconds.
+ yarn.sharedcache.cleaner.resource-sleep
+ 0
+
+
The interval that the yarn client library uses to poll the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
new file mode 100644
index 0000000..96a89ee
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cleaner service that maintains the shared cache area, and cleans up stale
+ * entries on a regular basis.
+ */
+@Private
+@Evolving
+public class CleanerService extends CompositeService {
+ /**
+ * Priority of the cleaner shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ /**
+ * The name of the global cleaner lock that the cleaner creates to indicate
+ * that a cleaning process is in progress.
+ */
+ public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
+
+ private static final Log LOG = LogFactory.getLog(CleanerService.class);
+
+ private Configuration conf;
+ private AppChecker appChecker;
+ private CleanerMetrics metrics;
+ private ScheduledExecutorService scheduler;
+ private final SCMStore store;
+ private final AtomicBoolean cleanerTaskRunning;
+
+ public CleanerService(SCMStore store) {
+ super("CleanerService");
+ this.store = store;
+ this.cleanerTaskRunning = new AtomicBoolean();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+
+ appChecker = createAppCheckerService(conf);
+ addService(appChecker);
+
+ // create a single threaded scheduler service that services the cleaner task
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (!writeGlobalCleanerPidFile()) {
+ throw new YarnException("The global cleaner pid file already exists!");
+ }
+
+ this.metrics = CleanerMetrics.initSingleton(conf);
+
+ // Start dependent services (i.e. AppChecker)
+ super.serviceStart();
+
+ Runnable task =
+ CleanerTask.create(conf, appChecker, store, metrics,
+ cleanerTaskRunning, true);
+ long periodInMinutes = getPeriod(conf);
+ scheduler.scheduleAtFixedRate(task, getInitialDelay(conf), periodInMinutes,
+ TimeUnit.MINUTES);
+ LOG.info("Scheduled the shared cache cleaner task to run every "
+ + periodInMinutes + " minutes.");
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Shutting down the background thread.");
+ scheduler.shutdownNow();
+ try {
+ if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Gave up waiting for the cleaner task to shutdown.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("The cleaner service was interrupted while shutting down the task.",
+ e);
+ }
+ LOG.info("The background thread stopped.");
+
+ removeGlobalCleanerPidFile();
+
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ AppChecker createAppCheckerService(Configuration conf) {
+ return SharedCacheManager.createAppCheckerService(conf);
+ }
+
+ /**
+ * If no other cleaner task is running, execute an on-demand cleaner task.
+ *
+ * @return true if the cleaner task was started, false if there was already a
+ * cleaner task running.
+ */
+ protected boolean runCleanerTask() {
+
+ if (!this.cleanerTaskRunning.compareAndSet(false, true)) {
+ LOG.warn("A cleaner task is already running. "
+ + "A new on-demand cleaner task will not be submitted.");
+ return false;
+ }
+
+ Runnable task =
+ CleanerTask.create(conf, appChecker, store, metrics,
+ cleanerTaskRunning, false);
+ // this is a non-blocking call (it simply submits the task to the executor
+ // queue and returns)
+ this.scheduler.execute(task);
+ /*
+ * We return true if the task is accepted for execution by the executor. Two
+ * notes: 1. There is a small race here between a scheduled task and an
+ * on-demand task. If the scheduled task happens to start after we check/set
+ * cleanerTaskRunning, but before we call execute, we will get two tasks
+ * that run back to back. Luckily, since we have already set
+ * cleanerTaskRunning, the scheduled task will do nothing and the on-demand
+ * task will clean. 2. We do not need to set the cleanerTaskRunning boolean
+ * back to false because this will be done in the task itself.
+ */
+ return true;
+ }
+
+ /**
+ * To ensure there are not multiple instances of the SCM running on a given
+ * cluster, a global pid file is used. This file contains the hostname of the
+ * machine that owns the pid file.
+ *
+ * @return true if the pid file was written, false otherwise
+ * @throws IOException
+ */
+ private boolean writeGlobalCleanerPidFile() throws YarnException {
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+
+ if (fs.exists(pidPath)) {
+ return false;
+ }
+
+ FSDataOutputStream os = fs.create(pidPath, false);
+ // write the hostname and the process id in the global cleaner pid file
+ final String ID = ManagementFactory.getRuntimeMXBean().getName();
+ os.writeUTF(ID);
+ os.close();
+ // add it to the delete-on-exit to ensure it gets deleted when the JVM
+ // exits
+ fs.deleteOnExit(pidPath);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ LOG.info("Created the global cleaner pid file at " + pidPath.toString());
+ return true;
+ }
+
+ private void removeGlobalCleanerPidFile() {
+ try {
+ FileSystem fs = FileSystem.get(this.conf);
+ String root =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+
+
+ fs.delete(pidPath, false);
+ LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
+ } catch (IOException e) {
+ LOG.error(
+ "Unable to remove the global cleaner pid file! The file may need "
+ + "to be removed manually.", e);
+ }
+ }
+
+ private static int getInitialDelay(Configuration conf) {
+ int initialDelayInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY);
+ // negative value is invalid; use the default
+ if (initialDelayInMinutes < 0) {
+ throw new HadoopIllegalArgumentException("Negative initial delay value: "
+ + initialDelayInMinutes
+ + ". The initial delay must be greater than zero.");
+ }
+ return initialDelayInMinutes;
+ }
+
+ private static int getPeriod(Configuration conf) {
+ int periodInMinutes =
+ conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD);
+ // non-positive value is invalid; use the default
+ if (periodInMinutes <= 0) {
+ throw new HadoopIllegalArgumentException("Non-positive period value: "
+ + periodInMinutes
+ + ". The cleaner period must be greater than or equal to zero.");
+ }
+ return periodInMinutes;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
new file mode 100644
index 0000000..c6cce01
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;
+
+/**
+ * The task that runs and cleans up the shared cache area for stale entries and
+ * orphaned files. It is expected that only one cleaner task runs at any given
+ * point in time.
+ */
+@Private
+@Evolving
+class CleanerTask implements Runnable {
+ public static final String RENAMED_SUFFIX = "-renamed";
+ private static final Log LOG = LogFactory.getLog(CleanerTask.class);
+
+ private final String location;
+ private final long sleepTime;
+ private final int nestedLevel;
+ private final Path root;
+ private final FileSystem fs;
+ private final AppChecker appChecker;
+ private final SCMStore store;
+ private final CleanerMetrics metrics;
+ private final AtomicBoolean cleanerTaskIsRunning;
+ private final boolean isScheduledTask;
+
+ /**
+ * Creates a cleaner task based on the configuration. This is provided for
+ * convenience.
+ *
+ * @param conf
+ * @param appChecker
+ * @param store
+ * @param metrics
+ * @param cleanerTaskRunning true if there is another cleaner task currently
+ * running
+ * @param isScheduledTask true if the task is a scheduled task
+ * @return an instance of a CleanerTask
+ */
+ public static CleanerTask create(Configuration conf, AppChecker appChecker,
+ SCMStore store, CleanerMetrics metrics, AtomicBoolean cleanerTaskRunning,
+ boolean isScheduledTask) {
+ try {
+ // get the root directory for the shared cache
+ String location =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+ long sleepTime =
+ conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP,
+ YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP);
+ int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+ FileSystem fs = FileSystem.get(conf);
+
+ return new CleanerTask(location, sleepTime, nestedLevel, fs, appChecker,
+ store, metrics, cleanerTaskRunning, isScheduledTask);
+ } catch (IOException e) {
+ LOG.error("Unable to obtain the filesystem for the cleaner service", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Creates a cleaner task based on the root directory location and the
+ * filesystem.
+ */
+ CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs,
+ AppChecker appChecker, SCMStore store, CleanerMetrics metrics,
+ AtomicBoolean cleanerTaskIsRunning, boolean isScheduledTask) {
+ this.location = location;
+ this.sleepTime = sleepTime;
+ this.nestedLevel = nestedLevel;
+ this.root = new Path(location);
+ this.fs = fs;
+ this.store = store;
+ this.appChecker = appChecker;
+ this.metrics = metrics;
+ this.cleanerTaskIsRunning = cleanerTaskIsRunning;
+ this.isScheduledTask = isScheduledTask;
+ }
+
+ public void run() {
+ // check if it is a scheduled task
+ if (isScheduledTask
+ && !this.cleanerTaskIsRunning.compareAndSet(false, true)) {
+ // this is a scheduled task and there is already another task running
+ LOG.warn("A cleaner task is already running. "
+ + "This scheduled cleaner task will do nothing.");
+ return;
+ }
+
+ try {
+ if (!fs.exists(root)) {
+ LOG.error("The shared cache root " + location + " was not found. "
+ + "The cleaner task will do nothing.");
+ return;
+ }
+
+ // we're now ready to process the shared cache area
+ process();
+ } catch (IOException e) {
+ LOG.error("Unexpected exception while initializing the cleaner task. "
+ + "This task will do nothing,", e);
+ } finally {
+ // this is set to false regardless of if it is a scheduled or on-demand
+ // task
+ this.cleanerTaskIsRunning.set(false);
+ }
+ }
+
+ /**
+ * Sweeps and processes the shared cache area to clean up stale and orphaned
+ * files.
+ */
+ void process() {
+ // mark the beginning of the run in the metrics
+ metrics.reportCleaningStart();
+ try {
+ // now traverse individual directories and process them
+ // the directory structure is specified by the nested level parameter
+ // (e.g. 9/c/d/)
+ StringBuilder pattern = new StringBuilder();
+ for (int i = 0; i < nestedLevel; i++) {
+ pattern.append("*/");
+ }
+ pattern.append("*");
+ FileStatus[] resources =
+ fs.globStatus(new Path(root, pattern.toString()));
+ int numResources = resources == null ? 0 : resources.length;
+ LOG.info("Processing " + numResources + " resources in the shared cache");
+ long beginNano = System.nanoTime();
+ if (resources != null) {
+ for (FileStatus resource : resources) {
+ // check for interruption so it can abort in a timely manner in case
+ // of shutdown
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("The cleaner task was interrupted. Aborting.");
+ break;
+ }
+
+ if (resource.isDirectory()) {
+ processSingleResource(resource);
+ } else {
+ LOG.warn("Invalid file at path " + resource.getPath().toString()
+ +
+ " when a directory was expected");
+ }
+ // add sleep time between cleaning each directory if it is non-zero
+ if (sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ }
+ }
+ long endNano = System.nanoTime();
+ long durationMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano);
+ LOG.info("Processed " + numResources + " resource(s) in " + durationMs
+ +
+ " ms.");
+ } catch (IOException e1) {
+ LOG.error("Unable to complete the cleaner task", e1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt(); // restore the interrupt
+ }
+ }
+
+ /**
+ * Returns a path for the root directory for the shared cache.
+ */
+ Path getRootPath() {
+ return root;
+ }
+
+ /**
+ * Processes a single shared cache resource directory.
+ */
+ void processSingleResource(FileStatus resource) {
+ Path path = resource.getPath();
+ // indicates the processing status of the resource
+ ResourceStatus resourceStatus = ResourceStatus.INIT;
+
+ // first, if the path ends with the renamed suffix, it indicates the
+ // directory was moved (as stale) but somehow not deleted (probably due to
+ // SCM failure); delete the directory
+ if (path.toString().endsWith(RENAMED_SUFFIX)) {
+ LOG.info("Found a renamed directory that was left undeleted at " +
+ path.toString() + ". Deleting.");
+ try {
+ if (fs.delete(path, true)) {
+ resourceStatus = ResourceStatus.DELETED;
+ }
+ } catch (IOException e) {
+ LOG.error("Error while processing a shared cache resource: " + path, e);
+ }
+ } else {
+ // this is the path to the cache resource directory
+ // the directory name is the resource key (i.e. a unique identifier)
+ String key = path.getName();
+
+ try {
+ cleanResourceReferences(key);
+ } catch (YarnException e) {
+ LOG.error("Exception thrown while removing dead appIds.", e);
+ }
+
+ if (store.isResourceEvictable(key, resource)) {
+ try {
+ /*
+ * TODO See YARN-2663: There is a race condition between
+ * store.removeResource(key) and
+ * removeResourceFromCacheFileSystem(path) operations because they do
+ * not happen atomically and resources can be uploaded with different
+ * file names by the node managers.
+ */
+ // remove the resource from scm (checks for appIds as well)
+ if (store.removeResource(key)) {
+ // remove the resource from the file system
+ boolean deleted = removeResourceFromCacheFileSystem(path);
+ if (deleted) {
+ resourceStatus = ResourceStatus.DELETED;
+ } else {
+ LOG.error("Failed to remove path from the file system."
+ + " Skipping this resource: " + path);
+ resourceStatus = ResourceStatus.ERROR;
+ }
+ } else {
+ // we did not delete the resource because it contained application
+ // ids
+ resourceStatus = ResourceStatus.PROCESSED;
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to remove path from the file system. Skipping this resource: "
+ + path, e);
+ resourceStatus = ResourceStatus.ERROR;
+ }
+ } else {
+ resourceStatus = ResourceStatus.PROCESSED;
+ }
+ }
+
+ // record the processing
+ switch (resourceStatus) {
+ case DELETED:
+ metrics.reportAFileDelete();
+ break;
+ case PROCESSED:
+ metrics.reportAFileProcess();
+ break;
+ case ERROR:
+ metrics.reportAFileError();
+ break;
+ default:
+ LOG.error("Cleaner encountered an invalid status (" + resourceStatus
+ + ") while processing resource: " + path.getName());
+ }
+ }
+
+ private boolean removeResourceFromCacheFileSystem(Path path)
+ throws IOException {
+ // rename the directory to make the delete atomic
+ Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
+ if (fs.rename(path, renamedPath)) {
+ // the directory can be removed safely now
+ // log the original path
+ LOG.info("Deleting " + path.toString());
+ return fs.delete(renamedPath, true);
+ } else {
+ // we were unable to remove it for some reason: it's best to leave
+ // it at that
+ LOG.error("We were not able to rename the directory to "
+ + renamedPath.toString() + ". We will leave it intact.");
+ }
+ return false;
+ }
+
+ /**
+ * Clean all resource references to a cache resource that contain application
+ * ids pointing to finished applications. If the resource key does not exist,
+ * do nothing.
+ *
+ * @param key a unique identifier for a resource
+ * @throws YarnException
+ */
+ private void cleanResourceReferences(String key) throws YarnException {
+ Collection refs =
+ store.getResourceReferences(key);
+ if (!refs.isEmpty()) {
+ Set refsToRemove =
+ new HashSet();
+ for (SharedCacheResourceReference r : refs) {
+ if (!appChecker.isApplicationActive(r.getAppId())) {
+ // application in resource reference is dead, it is safe to remove the
+ // reference
+ refsToRemove.add(r);
+ }
+ }
+ if (refsToRemove.size() > 0) {
+ store.removeResourceReferences(key, refsToRemove, false);
+ }
+ }
+ }
+
+ /**
+ * A status indicating what happened with the processing of a given cache
+ * resource.
+ */
+ private enum ResourceStatus {
+ INIT,
+ /** Resource was successfully processed, but not deleted **/
+ PROCESSED,
+ /** Resource was successfully deleted **/
+ DELETED,
+ /** The cleaner task ran into an error while processing the resource **/
+ ERROR;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 2f3ddb1..3fdb588 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -64,6 +64,9 @@ protected void serviceInit(Configuration conf) throws Exception {
this.store = createSCMStoreService(conf);
addService(store);
+ CleanerService cs = createCleanerService(store);
+ addService(cs);
+
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -90,6 +93,10 @@ private static SCMStore createSCMStoreService(Configuration conf) {
return store;
}
+ private CleanerService createCleanerService(SCMStore store) {
+ return new CleanerService(store);
+ }
+
@Override
protected void serviceStop() throws Exception {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
new file mode 100644
index 0000000..18f3363
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining the various Cleaner activity statistics and
+ * publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn")
+public class CleanerMetrics {
+ public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
+ private final MetricsRegistry registry = new MetricsRegistry("cleaner");
+
+ enum Singleton {
+ INSTANCE;
+
+ CleanerMetrics impl;
+
+ synchronized CleanerMetrics init(Configuration conf) {
+ if (impl == null) {
+ impl = create(conf);
+ }
+ return impl;
+ }
+ }
+
+ public static CleanerMetrics initSingleton(Configuration conf) {
+ return Singleton.INSTANCE.init(conf);
+ }
+
+ public static CleanerMetrics getInstance() {
+ CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
+ if (topMetrics == null)
+ throw new IllegalStateException(
+ "The CleanerMetics singlton instance is not initialized."
+ + " Have you called init first?");
+ return topMetrics;
+ }
+
+ @Metric("number of deleted files over all runs")
+ private MutableCounterLong totalDeletedFiles;
+
+ public long getTotalDeletedFiles() {
+ return totalDeletedFiles.value();
+ }
+
+ private @Metric("number of deleted files in the last run")
+ MutableGaugeLong deletedFiles;
+
+ public long getDeletedFiles() {
+ return deletedFiles.value();
+ }
+
+ private @Metric("number of processed files over all runs")
+ MutableCounterLong totalProcessedFiles;
+
+ public long getTotalProcessedFiles() {
+ return totalProcessedFiles.value();
+ }
+
+ private @Metric("number of processed files in the last run")
+ MutableGaugeLong processedFiles;
+
+ public long getProcessedFiles() {
+ return processedFiles.value();
+ }
+
+ @Metric("number of file errors over all runs")
+ private MutableCounterLong totalFileErrors;
+
+ public long getTotalFileErrors() {
+ return totalFileErrors.value();
+ }
+
+ private @Metric("number of file errors in the last run")
+ MutableGaugeLong fileErrors;
+
+ public long getFileErrors() {
+ return fileErrors.value();
+ }
+
+ private @Metric("Rate of deleting the files over all runs")
+ MutableGaugeInt totalDeleteRate;
+
+ public int getTotalDeleteRate() {
+ return totalDeleteRate.value();
+ }
+
+ private @Metric("Rate of deleting the files over the last run")
+ MutableGaugeInt deleteRate;
+
+ public int getDeleteRate() {
+ return deleteRate.value();
+ }
+
+ private @Metric("Rate of prcessing the files over all runs")
+ MutableGaugeInt totalProcessRate;
+
+ public int getTotalProcessRate() {
+ return totalProcessRate.value();
+ }
+
+ private @Metric("Rate of prcessing the files over the last run")
+ MutableGaugeInt processRate;
+
+ public int getProcessRate() {
+ return processRate.value();
+ }
+
+ private @Metric("Rate of file errors over all runs")
+ MutableGaugeInt totalErrorRate;
+
+ public int getTotalErrorRate() {
+ return totalErrorRate.value();
+ }
+
+ private @Metric("Rate of file errors over the last run")
+ MutableGaugeInt errorRate;
+
+ public int getErrorRate() {
+ return errorRate.value();
+ }
+
+ CleanerMetrics() {
+ }
+
+ /**
+ * The metric source obtained after parsing the annotations
+ */
+ MetricsSource metricSource;
+
+ /**
+ * The start of the last run of cleaner is ms
+ */
+ private AtomicLong lastRunStart = new AtomicLong(System.currentTimeMillis());
+
+ /**
+ * The end of the last run of cleaner is ms
+ */
+ private AtomicLong lastRunEnd = new AtomicLong(System.currentTimeMillis());
+
+ /**
+ * The sum of the durations of the last runs
+ */
+ private AtomicLong sumOfPastPeriods = new AtomicLong(0);
+
+ public MetricsSource getMetricSource() {
+ return metricSource;
+ }
+
+ static CleanerMetrics create(Configuration conf) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+
+ CleanerMetrics metricObject = new CleanerMetrics();
+ MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject);
+ final MetricsSource s = sb.build();
+ ms.register("cleaner", "The cleaner service of truly shared cache", s);
+ metricObject.metricSource = s;
+ return metricObject;
+ }
+
+ /**
+ * Report a delete operation at the current system time
+ */
+ public void reportAFileDelete() {
+ long time = System.currentTimeMillis();
+ reportAFileDelete(time);
+ }
+
+ /**
+ * Report a delete operation at the specified time.
+ * Delete implies process as well.
+ * @param time
+ */
+ public void reportAFileDelete(long time) {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ totalDeletedFiles.incr();
+ deletedFiles.incr();
+ updateRates(time);
+ lastRunEnd.set(time);
+ }
+
+ /**
+ * Report a process operation at the current system time
+ */
+ public void reportAFileProcess() {
+ long time = System.currentTimeMillis();
+ reportAFileProcess(time);
+ }
+
+ /**
+ * Report a process operation at the specified time
+ *
+ * @param time
+ */
+ public void reportAFileProcess(long time) {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ updateRates(time);
+ lastRunEnd.set(time);
+ }
+
+ /**
+ * Report a process operation error at the current system time
+ */
+ public void reportAFileError() {
+ long time = System.currentTimeMillis();
+ reportAFileError(time);
+ }
+
+ /**
+ * Report a process operation error at the specified time
+ *
+ * @param time
+ */
+ public void reportAFileError(long time) {
+ totalProcessedFiles.incr();
+ processedFiles.incr();
+ totalFileErrors.incr();
+ fileErrors.incr();
+ updateRates(time);
+ lastRunEnd.set(time);
+ }
+
+ private void updateRates(long time) {
+ long startTime = lastRunStart.get();
+ long lastPeriod = time - startTime;
+ long sumPeriods = sumOfPastPeriods.get() + lastPeriod;
+ float lastRunProcessRate = ((float) processedFiles.value()) / lastPeriod;
+ processRate.set(ratePerMsToHour(lastRunProcessRate));
+ float totalProcessRateMs = ((float) totalProcessedFiles.value()) / sumPeriods;
+ totalProcessRate.set(ratePerMsToHour(totalProcessRateMs));
+ float lastRunDeleteRate = ((float) deletedFiles.value()) / lastPeriod;
+ deleteRate.set(ratePerMsToHour(lastRunDeleteRate));
+ float totalDeleteRateMs = ((float) totalDeletedFiles.value()) / sumPeriods;
+ totalDeleteRate.set(ratePerMsToHour(totalDeleteRateMs));
+ float lastRunErrorRateMs = ((float) fileErrors.value()) / lastPeriod;
+ errorRate.set(ratePerMsToHour(lastRunErrorRateMs));
+ float totalErrorRateMs = ((float) totalFileErrors.value()) / sumPeriods;
+ totalErrorRate.set(ratePerMsToHour(totalErrorRateMs));
+ }
+
+ /**
+ * Report the start a new run of the cleaner at the current system time
+ */
+ public void reportCleaningStart() {
+ long time = System.currentTimeMillis();
+ reportCleaningStart(time);
+ }
+
+ /**
+ * Report the start a new run of the cleaner at the specified time
+ *
+ * @param time
+ */
+ public void reportCleaningStart(long time) {
+ long lastPeriod = lastRunEnd.get() - lastRunStart.get();
+ if (lastPeriod < 0) {
+ LOG.error("No operation since last start!");
+ lastPeriod = 0;
+ }
+ lastRunStart.set(time);
+ processedFiles.set(0);
+ deletedFiles.set(0);
+ processRate.set(0);
+ deleteRate.set(0);
+ errorRate.set(0);
+ sumOfPastPeriods.addAndGet(lastPeriod);
+ registry.tag(SessionId, Long.toString(time), true);
+ }
+
+ static int ratePerMsToHour(float ratePerMs) {
+ float ratePerHour = ratePerMs * 1000 * 3600;
+ return (int) ratePerHour;
+ }
+
+ public static boolean isCleanerMetricRecord(String name) {
+ return (name.startsWith("cleaner"));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java
new file mode 100644
index 0000000..d853922
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+
+/**
+ * Very simple metric collector for the cleaner service
+ */
+@Private
+@Evolving
+public class CleanerMetricsCollector implements MetricsCollector {
+
+ /**
+ * A map from reporting period to the list of reported metrics in that period
+ */
+ Map metrics = new HashMap();
+ String sessionId = null;
+
+ @Override
+ public MetricsRecordBuilder addRecord(String name) {
+ // For the cleaner service we need to record only the metrics destined for the
+ // cleaner service. We hence ignore the others.
+ if (CleanerMetrics.isCleanerMetricRecord(name)) {
+ return new CleanerMetricsRecordBuilder(metrics);
+ }
+ else
+ return new NullMetricsRecordBuilder();
+ }
+
+ @Override
+ public MetricsRecordBuilder addRecord(MetricsInfo info) {
+ return addRecord(info.name());
+ }
+
+ public Map getMetrics() {
+ return metrics;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ /**
+ * A builder which ignores all the added metrics, tags, etc.
+ */
+ class NullMetricsRecordBuilder extends MetricsRecordBuilder {
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ return this;
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return CleanerMetricsCollector.this;
+ }
+ }
+
+ /**
+ * A builder which keeps track of only counters and gauges
+ */
+ @Private
+ @Evolving
+ class CleanerMetricsRecordBuilder extends NullMetricsRecordBuilder {
+ Map metricValueMap;
+
+ public CleanerMetricsRecordBuilder(Map metricValueMap) {
+ this.metricValueMap = metricValueMap;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+ if (MsInfo.SessionId.equals(info))
+ setSessionId(value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ if (MsInfo.SessionId.equals(tag.info()))
+ setSessionId(tag.value());
+ return this;
+ }
+
+ private void setSessionId(String value) {
+ sessionId = value;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(AbstractMetric metric) {
+ metricValueMap.put(metric.name(), metric.value());
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+ metricValueMap.put(info.name(), value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+ metricValueMap.put(info.name(),value);
+ return this;
+ }
+
+ @Override
+ public MetricsCollector parent() {
+ return CleanerMetricsCollector.this;
+ }
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
new file mode 100644
index 0000000..c910211
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.junit.Test;
+
+public class TestCleanerTask {
+ private static final String ROOT =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
+ private static final long SLEEP_TIME =
+ YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP;
+ private static final int NESTED_LEVEL =
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+
+ @Test
+ public void testNonExistentRoot() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean());
+ // the shared cache root does not exist
+ when(fs.exists(task.getRootPath())).thenReturn(false);
+
+ task.run();
+
+ // process() should not be called
+ verify(task, never()).process();
+ }
+
+ @Test
+ public void testProcessFreshResource() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean());
+
+ // mock a resource that is not evictable
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(false);
+ FileStatus status = mock(FileStatus.class);
+ when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+
+ // process the resource
+ task.processSingleResource(status);
+
+ // the directory should not be renamed
+ verify(fs, never()).rename(eq(status.getPath()), isA(Path.class));
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+
+ @Test
+ public void testProcessEvictableResource() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ when(appChecker.isApplicationActive(isA(ApplicationId.class))).thenReturn(
+ false);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean());
+
+ // mock an evictable resource
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(true);
+ FileStatus status = mock(FileStatus.class);
+ when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ when(store.removeResource(isA(String.class))).thenReturn(true);
+ // rename succeeds
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ // delete returns true
+ when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
+
+ // process the resource
+ task.processSingleResource(status);
+
+ // the directory should be renamed
+ verify(fs).rename(eq(status.getPath()), isA(Path.class));
+ // metrics should record a deleted file
+ verify(metrics).reportAFileDelete();
+ verify(metrics, never()).reportAFileProcess();
+ }
+
+ private CleanerTask createSpiedTask(FileSystem fs, AppChecker appChecker,
+ SCMStore store, CleanerMetrics metrics, AtomicBoolean isCleanerRunning) {
+ return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, appChecker,
+ store, metrics, isCleanerRunning, true));
+ }
+
+ @Test
+ public void testResourceIsInUseHasAnActiveApp() throws Exception {
+ FileSystem fs = mock(FileSystem.class);
+ AppChecker appChecker = mock(AppChecker.class);
+ CleanerMetrics metrics = mock(CleanerMetrics.class);
+ SCMStore store = mock(SCMStore.class);
+
+ FileStatus resource = mock(FileStatus.class);
+ when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+ // resource is stale
+ when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+ .thenReturn(true);
+ // but still has appIds
+ when(store.removeResource(isA(String.class))).thenReturn(false);
+
+ CleanerTask task =
+ createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean());
+
+ // process the resource
+ task.processSingleResource(resource);
+
+ // metrics should record a processed file (but not delete)
+ verify(metrics).reportAFileProcess();
+ verify(metrics, never()).reportAFileDelete();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
new file mode 100644
index 0000000..0aaa3bf
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCleanerMetrics {
+
+ Configuration conf = new Configuration();
+ long currTime = System.currentTimeMillis();
+ // it is float to allow floating point rate calculation
+ float lastDuration = 0;
+ float totalDurations = 0;
+ CleanerMetrics cleanerMetrics;
+
+ @Before
+ public void init() {
+ CleanerMetrics.initSingleton(conf);
+ cleanerMetrics = CleanerMetrics.getInstance();
+ }
+
+ @Test
+ public void testMetricsOverMultiplePeriods() {
+ simulateACleanerRun();
+ assertMetrics(4, 4, 1, 1);
+ currTime += 1300;
+ simulateACleanerRun();
+ assertMetrics(4, 8, 1, 2);
+ }
+
+ public void simulateACleanerRun() {
+ long startTime = currTime;
+ cleanerMetrics.reportCleaningStart(currTime);
+ currTime += 9;
+ cleanerMetrics.reportAFileProcess(currTime);
+ cleanerMetrics.reportAFileDelete(currTime);
+ currTime++;
+ cleanerMetrics.reportAFileProcess(currTime);
+ cleanerMetrics.reportAFileProcess(currTime);
+ lastDuration = currTime - startTime;
+ totalDurations += lastDuration;
+ }
+
+ void assertMetrics(int proc, int totalProc, int del, int totalDel) {
+ assertEquals(
+ "Processed files in the last period are not measured correctly", proc,
+ cleanerMetrics.getProcessedFiles());
+ assertEquals("Total processed files are not measured correctly",
+ totalProc, cleanerMetrics.getTotalProcessedFiles());
+ assertEquals(
+ "Deleted files in the last period are not measured correctly", del,
+ cleanerMetrics.getDeletedFiles());
+ assertEquals("Total deleted files are not measured correctly",
+ totalDel, cleanerMetrics.getTotalDeletedFiles());
+
+ assertEquals(
+ "Rate of processed files in the last period are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(proc / lastDuration),
+ cleanerMetrics.getProcessRate());
+ assertEquals(
+ "Rate of total processed files are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(totalProc / totalDurations),
+ cleanerMetrics.getTotalProcessRate());
+ assertEquals(
+ "Rate of deleted files in the last period are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(del / lastDuration),
+ cleanerMetrics.getDeleteRate());
+ assertEquals(
+ "Rate of total deleted files are not measured correctly",
+ CleanerMetrics.ratePerMsToHour(totalDel / totalDurations),
+ cleanerMetrics.getTotalDeleteRate());
+ }
+}