diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 266932b..1fc5733 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1255,6 +1255,34 @@ public static final String DEFAULT_SCM_STORE_IMPL = "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore"; + // Cleaner configs + private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX + + "cleaner."; + + /** The config key for the shared cache staleness criteria. */ + public static final String SCM_CLEANER_STALENESS_MINUTES = SCM_CLEANER_PREFIX + + "staleness.minutes"; + public static final int DEFAULT_SCM_CLEANER_STALENESS_MINUTES = 7 * 24 * 60; + + /** + * The config key for how often the cleaner service for the shared cache runs. + */ + public static final String SCM_CLEANER_CLEANING_PERIOD_MINUTES = + SCM_CLEANER_PREFIX + "period.minutes"; + public static final int DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES = 24 * 60; + + /** The config key for the initial delay in starting the cleaner service. */ + public static final String SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES = + SCM_CLEANER_PREFIX + "initial.delay.minutes"; + public static final int DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES = + 10; + + /** The config key for sleep time between cleaning each directory. */ + public static final String SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS = + SCM_CLEANER_PREFIX + "cleaner.sleep.between.clean.ms"; + public static final long DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS = + 0L; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e552850..53d78e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1341,13 +1341,42 @@ yarn.sharedcache.nested.level 3 - + The implementation to be used for the SCM store yarn.sharedcache.manager.store.impl org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore + + The staleness criteria in days beyond which shared cache + entries may be deleted (minutes). It must be positive. It is 7 days by + default. + yarn.sharedcache.cleaner.staleness.minutes + 10080 + + + + How often the cleaner service for the shared cache runs + (minutes). It must be positive. It is 1 day by default. + yarn.sharedcache.cleaner.period.minutes + 1440 + + + + The initial delay in starting the cleaner service (minutes). It + must be non-negative. + yarn.sharedcache.cleaner.initial.delay.minutes + 10 + + + + Sleep time between cleaning each directory (ms). It must be + non-negative. + yarn.sharedcache.cleaner.sleep.between.clean.ms + 10 + + The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java new file mode 100644 index 0000000..4fd5390 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * The cleaner service that maintains the shared cache area, and cleans up stale + * entries on a regular basis. + */ +public class CleanerService extends AbstractService { + /** + * Priority of the cleaner shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + /** + * The name of the global cleaner lock that the cleaner creates to indicate + * that a cleaning process is in progress. + */ + public static final String GLOBAL_CLEANER_PID = ".cleaner_pid"; + + private static final Log LOG = LogFactory.getLog(CleanerService.class); + + private Configuration conf; + private final AppChecker appChecker; + private CleanerMetrics metrics; + private ScheduledExecutorService scheduler; + private final SCMStore store; + private final SCMContext context; + private final AtomicBoolean cleanerTaskRunning; + + public CleanerService(AppChecker appChecker, SCMStore store, + SCMContext context) { + super("CleanerService"); + this.appChecker = appChecker; + this.store = store; + this.context = context; + this.cleanerTaskRunning = new AtomicBoolean(); + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + // create a single threaded scheduler service that services the cleaner task + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); + scheduler = Executors.newSingleThreadScheduledExecutor(tf); + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + if (!writeGlobalCleanerPidFile()) { + throw new YarnException("The global cleaner pid file already exists!"); + } + + long initialDelayInSeconds; + long periodInSeconds; + + this.metrics = CleanerMetrics.initSingleton(conf); + + int initialDelayInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES, + YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES); + // negative value is invalid; use the default + if (initialDelayInMinutes < 0) { + LOG.warn("Negative initial delay value: " + initialDelayInMinutes + + ". The default will be used instead."); + initialDelayInMinutes = + YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_INITIAL_DELAY_MINUTES; + } + initialDelayInSeconds = TimeUnit.MINUTES.toSeconds(initialDelayInMinutes); + int periodInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_CLEANING_PERIOD_MINUTES, + YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES); + // non-positive value is invalid; use the default + if (periodInMinutes <= 0) { + LOG.warn("Non-positive period value: " + periodInMinutes + + ". The default will be used instead."); + periodInMinutes = YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_PERIOD_MINUTES; + } + periodInSeconds = TimeUnit.MINUTES.toSeconds(periodInMinutes); + + Runnable task = CleanerTask.create(conf, appChecker, store, + context, metrics, cleanerTaskRunning, true); + scheduler.scheduleAtFixedRate(task, initialDelayInSeconds, periodInSeconds, + TimeUnit.SECONDS); + LOG.info("Scheduled the shared cache cleaner task to run every " + + periodInSeconds + " seconds."); + + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + LOG.info("Shutting down the background thread."); + scheduler.shutdownNow(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Gave up waiting for the cleaner task to shutdown."); + } + } catch (InterruptedException e) { + LOG.warn("The cleaner service was interrupted while shutting down the task.", + e); + } + LOG.info("The background thread stopped."); + + removeGlobalCleanerPidFile(); + + super.serviceStop(); + } + + /** + * If no other cleaner task is running, execute an on-demand cleaner task. + * + * @return true if the cleaner task was started, false if there was already a + * cleaner task running. + */ + protected boolean runCleanerTask() { + + if (!this.cleanerTaskRunning.compareAndSet(false, true)) { + LOG.warn("A cleaner task is already running. " + + "A new on-demand cleaner task will not be submitted."); + return false; + } + + Runnable task = + CleanerTask.create(conf, appChecker, store, context, metrics, + cleanerTaskRunning, false); + // this is a non-blocking call (it simply submits the task to the executor + // queue and returns) + this.scheduler.execute(task); + /* + * We return true if the task is accepted for execution by the executor. Two + * notes: 1. There is a small race here between a scheduled task and an + * on-demand task. If the scheduled task happens to start after we check/set + * cleanerTaskRunning, but before we call execute, we will get two tasks + * that run back to back. Luckily, since we have already set + * cleanerTaskRunning, the scheduled task will do nothing and the on-demand + * task will clean. 2. We do not need to set the cleanerTaskRunning boolean + * back to false because this will be done in the task itself. + */ + return true; + } + + /** + * To ensure there are not multiple instances of the SCM running on a given + * cluster, a global pid file is used. This file contains the hostname of the + * machine that owns the pid file. + * + * @return true if the pid file was written, false otherwise + * @throws IOException + */ + private boolean writeGlobalCleanerPidFile() throws YarnException { + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + try { + FileSystem fs = FileSystem.get(this.conf); + + if (fs.exists(pidPath)) { + return false; + } + + FSDataOutputStream os = fs.create(pidPath, false); + // write the hostname and the process id in the global cleaner pid file + final String ID = ManagementFactory.getRuntimeMXBean().getName(); + os.writeUTF(ID); + os.close(); + // add it to the delete-on-exit to ensure it gets deleted when the JVM + // exits + fs.deleteOnExit(pidPath); + } catch (IOException e) { + throw new YarnException(e); + } + LOG.info("Created the global cleaner pid file at " + pidPath.toString()); + return true; + } + + private void removeGlobalCleanerPidFile() { + try { + FileSystem fs = FileSystem.get(this.conf); + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + + + fs.delete(pidPath, false); + LOG.info("Removed the global cleaner pid file at " + pidPath.toString()); + } catch (IOException e) { + LOG.error( + "Unable to remove the global cleaner pid file! The file may need " + + "to be removed manually.", e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java new file mode 100644 index 0000000..7704dda --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.ResourceReference; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The task that runs and cleans up the shared cache area for stale entries and + * orphaned files. It is expected that only one cleaner task runs at any given + * point in time. + */ +class CleanerTask implements Runnable { + public static final String RENAMED_SUFFIX = "-renamed"; + private static final Log LOG = LogFactory.getLog(CleanerTask.class); + + private final String location; + private final int stalenessMinutes; + private final long sleepTime; + private final int nestedLevel; + private final Path root; + private final FileSystem fs; + private final AppChecker appChecker; + private final SCMStore store; + private final SCMContext context; + private final CleanerMetrics metrics; + private final AtomicBoolean cleanerTaskIsRunning; + private final boolean isScheduledTask; + + /** + * Creates a cleaner task based on the configuration. This is provided for + * convenience. + * + * @param conf + * @param appChecker + * @param store + * @param context + * @param metrics + * @param cleanerTaskRunning true if there is another cleaner task currently + * running + * @param isScheduledTask true if the task is a scheduled task + */ + public static CleanerTask create(Configuration conf, AppChecker appChecker, + SCMStore store, SCMContext context, CleanerMetrics metrics, + AtomicBoolean cleanerTaskRunning, boolean isScheduledTask) { + try { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + int stalenessMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_STALENESS_MINUTES, + YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES); + // non-positive value is invalid; use the default + if (stalenessMinutes <= 0) { + LOG.warn("Non-positive staleness value: " + stalenessMinutes + + ". The default will be used instead."); + stalenessMinutes = YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES; + } + long sleepTime = + conf.getLong(YarnConfiguration.SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS, + YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS); + int nestedLevel = CacheStructureUtil.getCacheDepth(conf); + FileSystem fs = FileSystem.get(conf); + + return new CleanerTask(location, stalenessMinutes, sleepTime, + nestedLevel, fs, appChecker, store, context, metrics, + cleanerTaskRunning, isScheduledTask); + } catch (IOException e) { + LOG.error("Unable to obtain the filesystem for the cleaner service", e); + throw new ExceptionInInitializerError(e); + } + } + + /** + * Creates a cleaner task based on the root directory location and the + * filesystem. + */ + CleanerTask(String location, int stalenessMinutes, long sleepTime, + int nestedLevel, FileSystem fs, AppChecker appChecker, SCMStore store, + SCMContext context, CleanerMetrics metrics, + AtomicBoolean cleanerTaskIsRunning, boolean isScheduledTask) { + this.location = location; + this.stalenessMinutes = stalenessMinutes; + this.sleepTime = sleepTime; + this.nestedLevel = nestedLevel; + this.root = new Path(location); + this.fs = fs; + this.store = store; + this.context = context; + this.appChecker = appChecker; + this.metrics = metrics; + this.cleanerTaskIsRunning = cleanerTaskIsRunning; + this.isScheduledTask = isScheduledTask; + } + + public void run() { + // check if it is a scheduled task + if (isScheduledTask + && !this.cleanerTaskIsRunning.compareAndSet(false, true)) { + // this is a scheduled task and there is already another task running + LOG.warn("A cleaner task is already running. " + + "This scheduled cleaner task will do nothing."); + return; + } + + try { + if (!fs.exists(root)) { + LOG.error("The shared cache root " + location + " was not found. " + + "The cleaner task will do nothing."); + return; + } + + // if we are using the in-memory store, we need to ensure the initial list + // of active apps is exhausted before the cleaner task can really process + // the entries because we do not remember whether any of the active apps + // may have been using the shared cache entries this check is needed + // specifically for the default in-memory store + if (store instanceof InMemorySCMStore && !processInitialActiveApps()) { + return; + } + + // we're now ready to process the shared cache area + process(); + } catch (IOException e) { + LOG.error("Unexpected exception while initializing the cleaner task. " + + "This task will do nothing,", e); + } finally { + // this is set to false regardless of if it is a scheduled or on-demand + // task + this.cleanerTaskIsRunning.set(false); + } + } + + /** + * Goes through the list of the apps, and removes apps that are no longer + * active. + * + * @return true if the list is null or empty; false otherwise + */ + @VisibleForTesting + boolean processInitialActiveApps() { + Collection activeApps = context.getInitialActiveApps(); + if (activeApps == null || activeApps.isEmpty()) { + // we're fine; there are no active apps that were running at the time of + // the service start + return true; + } + LOG.info("Looking into " + activeApps.size() + + " apps to see if they are still active"); + Iterator it = activeApps.iterator(); + while (it.hasNext()) { + ApplicationId id = it.next(); + try { + if (!appChecker.appIsActive(id)) { + // remove it from the list + it.remove(); + } + } catch (IOException e) { + LOG.warn("Exception while checking the app status; will leave the entry in", + e); + // continue + } + } + LOG.info("There are now " + activeApps.size() + " entries in the list"); + return activeApps.isEmpty(); + } + + /** + * Sweeps and processes the shared cache area to clean up stale and orphaned + * files. + */ + void process() { + // mark the beginning of the run in the metrics + metrics.reportCleaningStart(); + try { + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d/) + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < nestedLevel; i++) { + pattern.append("*/"); + } + pattern.append("*"); + FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString())); + int numEntries = entries == null ? 0 : entries.length; + LOG.info("Processing " + numEntries + " entries"); + long beginNano = System.nanoTime(); + if (entries != null) { + for (FileStatus entry: entries) { + // check for interruption so it can abort in a timely manner in case + // of shutdown + if (Thread.currentThread().isInterrupted()) { + LOG.warn("The cleaner task was interrupted. Aborting."); + break; + } + + if (entry.isDirectory()) { + processSingleEntry(entry); + } else { + LOG.warn("Invalid file at path " + entry.getPath().toString() + + " when a directory was expected"); + } + // add sleep time between cleaning each directory if it is non-zero + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } + } + long endNano = System.nanoTime(); + long durationMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano); + LOG.info("Processed " + numEntries + " entries in " + durationMs + + " ms."); + } catch (IOException e1) { + LOG.error("Unable to complete the cleaner task", e1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); // restore the interrupt + } + } + + /** + * Returns a path for the root directory for the shared cache. + */ + Path getRootPath() { + return root; + } + + /** + * Processes a single shared cache entry directory. + */ + void processSingleEntry(FileStatus entry) { + Path path = entry.getPath(); + // indicates the processing status of the entry + TaskStatus taskStatus = TaskStatus.INIT; + + // first, if the path ends with the renamed suffix, it indicates the + // directory was moved (as stale) but somehow not deleted (probably due to + // SCM failure); delete the directory + if (path.toString().endsWith(RENAMED_SUFFIX)) { + LOG.info("Found a renamed directory that was left undeleted at " + + path.toString() + ". Deleting."); + try { + if (fs.delete(path, true)) { + taskStatus = TaskStatus.DELETED; + } + } catch (IOException e) { + LOG.error("Error while processing an entry: " + path, e); + } + } else { + // this is the path to the cache entry directory + // the directory name is the cache entry key (i.e. checksum) + String key = path.getName(); + + try { + cleanDeadAppIds(key); + } catch (IOException e) { + LOG.error("Exception thrown while removing dead appIds.", e); + } + + if (entryIsStale(key, entry)) { + try { + /* + * As a side note: store.removeKey(key) and + * removeEntryFromCacheFileSystem(path) do not have to be in a + * critical section. We will never receive a notify for this entry + * before the delete from the FS has happened because the rename on + * the node manager will fail. If the node manager uploads the file + * after it is deleted form the FS, we are ok and the key will simply + * get added back to the scm once a notification is received. + */ + // remove the entry from scm + if (store.removeKey(key)) { + // remove the entry from the file system + boolean deleted = removeEntryFromCacheFileSystem(path); + if (deleted) { + taskStatus = TaskStatus.DELETED; + } else { + LOG.error("Failed to remove path from the file system." + + " Skipping this entry: " + path); + taskStatus = TaskStatus.ERROR; + } + } else { + // we did not delete the entry because it contained application ids + taskStatus = TaskStatus.PROCESSED; + } + } catch (IOException e) { + LOG.error( + "Failed to remove path from the file system. Skipping this entry: " + + path, e); + taskStatus = TaskStatus.ERROR; + } + } else { + taskStatus = TaskStatus.PROCESSED; + } + } + + // record the processing + switch (taskStatus) { + case DELETED: + metrics.reportAFileDelete(); + break; + case PROCESSED: + metrics.reportAFileProcess(); + break; + case ERROR: + // TODO add metric for reporting errors in processing an entry + break; + default: + LOG.error( + "Cleaner task ended with an invalid task status: " + taskStatus); + } + } + + boolean entryIsStale(String key, FileStatus entry) { + long staleTime = + System.currentTimeMillis() + - TimeUnit.MINUTES.toMillis(stalenessMinutes); + long accessTime = store.getAccessTime(key); + if (accessTime == -1) { + // check modification time + long modTime = entry.getModificationTime(); + // if modification time is older then SCM startup time, we need to just + // use SCM startup time as the last point of certainty + long lastUse = + modTime < context.getStartTime() ? context.getStartTime() : modTime; + return lastUse < staleTime; + } else { + // check access time + return accessTime < staleTime; + } + } + + private boolean removeEntryFromCacheFileSystem(Path path) throws IOException { + // rename the directory to make the delete atomic + Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX); + if (fs.rename(path, renamedPath)) { + // the directory can be removed safely now + // log the original path + LOG.info("Deleting " + path.toString()); + return fs.delete(renamedPath, true); + } else { + // we were unable to remove it for some reason: it's best to leave + // it at that + LOG.error("We were not able to rename the directory to " + + renamedPath.toString() + ". We will leave it intact."); + } + return false; + } + + /** + * Delete all appIds for apps that are not in an end state (regardless of + * whether the entry is stale or not). If key or appId does not exist in scm + * do nothing. + * + * @param key checksum of entry + * @throws IOException + */ + private void cleanDeadAppIds(String key) throws IOException { + Collection refs = store.getResourceReferences(key); + if (refs != null) { + Set refsToRemove = new HashSet(); + for (ResourceReference r : refs) { + if (!appChecker.appIsActive(r.getAppId())) { + // app is dead + refsToRemove.add(r); + } + } + if (refsToRemove.size() > 0) { + store.removeResourceRefs(key, refsToRemove, false); + } + } + } + + /** + * A status indicating what happened with the processing of a given cache + * entry. + */ + private enum TaskStatus { + INIT, + /** Entry was successfully processed, but not deleted **/ + PROCESSED, + /** Entry was successfully deleted **/ + DELETED, + /** The cleaner task ran into an error while processing the entry **/ + ERROR; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index fde4935..4c733aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -77,6 +77,9 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { this.store = createSCMStoreService(conf, context); addService(store); + CleanerService cs = createCleanerService(appChecker, store, context); + addService(cs); + } catch (IOException e) { LOG.error("Encountered unexpected exception while initializing the shared cache manager", e); @@ -184,6 +187,11 @@ private static SCMStore createSCMStoreService(Configuration conf, return store; } + private CleanerService createCleanerService(AppChecker appChecker, + SCMStore store, SCMContext context) { + return new CleanerService(appChecker, store, context); + } + @Override protected synchronized void serviceStart() throws Exception { // Start metrics diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java new file mode 100644 index 0000000..166d46a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsAnnotations; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * This class is for maintaining the various Cleaner activity statistics and + * publishing them through the metrics interfaces. + */ +@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn") +public class CleanerMetrics { + public static final Log LOG = LogFactory.getLog(CleanerMetrics.class); + private final MetricsRegistry registry = new MetricsRegistry("cleaner"); + + enum Singleton { + INSTANCE; + + CleanerMetrics impl; + + synchronized CleanerMetrics init(Configuration conf) { + if (impl == null) { + impl = create(conf); + } + return impl; + } + } + + public static CleanerMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static CleanerMetrics getInstance() { + CleanerMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The CleanerMetics singlton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + @Metric("number of deleted files over all runs") + private MutableCounterLong totalDeletedFiles; + + public long getTotalDeletedFiles() { + return totalDeletedFiles.value(); + } + + private @Metric("number of deleted files in the last run") + MutableGaugeLong deletedFiles; + + public long getDeletedFiles() { + return deletedFiles.value(); + } + + private @Metric("number of processed files over all runs") + MutableCounterLong totalProcessedFiles; + + public long getTotalProcessedFiles() { + return totalProcessedFiles.value(); + } + + private @Metric("number of processed files in the last run") + MutableGaugeLong processedFiles; + + public long getProcessedFiles() { + return processedFiles.value(); + } + + private @Metric("Rate of deleting the files over all runs") + MutableGaugeInt totalDeleteRate; + + public int getTotalDeleteRate() { + return totalDeleteRate.value(); + } + + private @Metric("Rate of deleting the files over the last run") + MutableGaugeInt deleteRate; + + public int getDeleteRate() { + return deleteRate.value(); + } + + private @Metric("Rate of prcessing the files over all runs") + MutableGaugeInt totalProcessRate; + + public int getTotalProcessRate() { + return totalProcessRate.value(); + } + + private @Metric("Rate of prcessing the files over the last run") + MutableGaugeInt processRate; + + public int getProcessRate() { + return processRate.value(); + } + + CleanerMetrics() { + } + + /** + * The metric source obtained after parsing the annotations + */ + MetricsSource metricSource; + + /** + * The start of the last run of cleaner is ms + */ + private AtomicLong lastRunStart = new AtomicLong(System.currentTimeMillis()); + + /** + * The end of the last run of cleaner is ms + */ + private AtomicLong lastRunEnd = new AtomicLong(System.currentTimeMillis()); + + /** + * The sum of the durations of the last runs + */ + private AtomicLong sumOfPastPeriods = new AtomicLong(0); + + public MetricsSource getMetricSource() { + return metricSource; + } + + static CleanerMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + CleanerMetrics metricObject = new CleanerMetrics(); + MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject); + final MetricsSource s = sb.build(); + ms.register("cleaner", "The cleaner service of truly shared cache", s); + metricObject.metricSource = s; + return metricObject; + } + + /** + * Report a delete operation at the current system time + */ + public void reportAFileDelete() { + long time = System.currentTimeMillis(); + reportAFileDelete(time); + } + + /** + * Report a delete operation at the specified time. + * Delete implies process as well. + * @param time + */ + public void reportAFileDelete(long time) { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalDeletedFiles.incr(); + deletedFiles.incr(); + updateRates(time); + lastRunEnd.set(time); + } + + /** + * Report a process operation at the current system time + */ + public void reportAFileProcess() { + long time = System.currentTimeMillis(); + reportAFileProcess(time); + } + + /** + * Report a process operation at the specified time + * + * @param time + */ + public void reportAFileProcess(long time) { + totalProcessedFiles.incr(); + processedFiles.incr(); + updateRates(time); + lastRunEnd.set(time); + } + + private void updateRates(long time) { + long startTime = lastRunStart.get(); + long lastPeriod = time - startTime; + long sumPeriods = sumOfPastPeriods.get() + lastPeriod; + float lastRunProcessRate = ((float) processedFiles.value()) / lastPeriod; + processRate.set(ratePerMsToHour(lastRunProcessRate)); + float totalProcessRateMs = ((float) totalProcessedFiles.value()) / sumPeriods; + totalProcessRate.set(ratePerMsToHour(totalProcessRateMs)); + float lastRunDeleteRate = ((float) deletedFiles.value()) / lastPeriod; + deleteRate.set(ratePerMsToHour(lastRunDeleteRate)); + float totalDeleteRateMs = ((float) totalDeletedFiles.value()) / sumPeriods; + totalDeleteRate.set(ratePerMsToHour(totalDeleteRateMs)); + } + + /** + * Report the start a new run of the cleaner at the current system time + */ + public void reportCleaningStart() { + long time = System.currentTimeMillis(); + reportCleaningStart(time); + } + + /** + * Report the start a new run of the cleaner at the specified time + * + * @param time + */ + public void reportCleaningStart(long time) { + long lastPeriod = lastRunEnd.get() - lastRunStart.get(); + if (lastPeriod < 0) { + LOG.error("No operation since last start!"); + lastPeriod = 0; + } + lastRunStart.set(time); + processedFiles.set(0); + deletedFiles.set(0); + processRate.set(0); + deleteRate.set(0); + sumOfPastPeriods.addAndGet(lastPeriod); + registry.tag(SessionId, Long.toString(time), true); + } + + static int ratePerMsToHour(float ratePerMs) { + float ratePerHour = ratePerMs * 1000 * 3600; + return (int) ratePerHour; + } + + public static boolean isCleanerMetricRecord(String name) { + return (name.startsWith("cleaner")); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java new file mode 100644 index 0000000..84672e7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MsInfo; + +/** + * Very simple metric collector for the cleaner service + */ +public class CleanerMetricsCollector implements MetricsCollector { + + /** + * A map from reporting period to the list of reported metrics in that period + */ + Map metrics = new HashMap(); + String sessionId = null; + + @Override + public MetricsRecordBuilder addRecord(String name) { + // For the cleaner service we need to record only the metrics destined for the + // cleaner service. We hence ignore the others. + if (CleanerMetrics.isCleanerMetricRecord(name)) { + return new CleanerMetricsRecordBuilder(metrics); + } + else + return new NullMetricsRecordBuilder(); + } + + @Override + public MetricsRecordBuilder addRecord(MetricsInfo info) { + return addRecord(info.name()); + } + + public Map getMetrics() { + return metrics; + } + + public String getSessionId() { + return sessionId; + } + + /** + * A builder which ignores all the added metrics, tags, etc. + */ + class NullMetricsRecordBuilder extends MetricsRecordBuilder { + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return this; + } + + @Override + public MetricsCollector parent() { + return CleanerMetricsCollector.this; + } + } + + /** + * A builder which keeps track of only counters and gauges + */ + class CleanerMetricsRecordBuilder extends NullMetricsRecordBuilder { + Map metricValueMap; + + public CleanerMetricsRecordBuilder(Map metricValueMap) { + this.metricValueMap = metricValueMap; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + if (MsInfo.SessionId.equals(info)) + setSessionId(value); + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + if (MsInfo.SessionId.equals(tag.info())) + setSessionId(tag.value()); + return this; + } + + private void setSessionId(String value) { + sessionId = value; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + metricValueMap.put(metric.name(), metric.value()); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + metricValueMap.put(info.name(), value); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsCollector parent() { + return CleanerMetricsCollector.this; + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java new file mode 100644 index 0000000..6bffcc6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.Test; + +public class TestCleanerTask { + private static final String ROOT = + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT; + private static final int STALENESS_MINUTES = + YarnConfiguration.DEFAULT_SCM_CLEANER_STALENESS_MINUTES; + private static final long SLEEP_TIME = + YarnConfiguration.DEFAULT_SCM_CLEANER_CLEANING_SLEEP_BETWEEN_CLEAN_MS; + private static final int NESTED_LEVEL = + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + + @Test + public void testNonExistentRoot() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + SCMContext context = mock(SCMContext.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + // the shared cache root does not exist + when(fs.exists(task.getRootPath())).thenReturn(false); + + task.run(); + + // process() should not be called + verify(task, never()).process(); + } + + @Test + public void testProcessFreshEntry() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + SCMContext context = mock(SCMContext.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + + // mock an entry whose modification timestamp is fresh + long staleModificationTime = + System.currentTimeMillis() - + TimeUnit.MINUTES.toMillis(STALENESS_MINUTES - 1); + FileStatus entry = mock(FileStatus.class); + when(fs.exists(new Path(ROOT))).thenReturn(true); + when(entry.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + when(entry.getModificationTime()).thenReturn(staleModificationTime); + when(store.getAccessTime(isA(String.class))).thenReturn(-1L); + + // process the entry + task.processSingleEntry(entry); + + // the directory should not be renamed + verify(fs, never()).rename(eq(entry.getPath()), isA(Path.class)); + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } + + @Test + public void testProcessStaleEntry() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + when(appChecker.appIsActive(isA(ApplicationId.class))).thenReturn(false); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + SCMContext context = mock(SCMContext.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + + // mock an entry whose modification timestamp is stale + long staleModificationTime = + System.currentTimeMillis() - + TimeUnit.MINUTES.toMillis(STALENESS_MINUTES + 1); + FileStatus entry = mock(FileStatus.class); + String file = ROOT + "/a/b/c/abc"; + when(entry.getPath()).thenReturn(new Path(file)); + when(entry.getModificationTime()).thenReturn(staleModificationTime); + when(store.getAccessTime(isA(String.class))).thenReturn(-1L); + // mock the task to determine that this entry is in use + doReturn(true).when(task).entryIsStale(isA(String.class), + isA(FileStatus.class)); + when(store.removeKey(isA(String.class))).thenReturn(true); + // rename succeeds + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + // delete returns true + when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true); + + // process the entry + task.processSingleEntry(entry); + + // the directory should be renamed + verify(fs).rename(eq(entry.getPath()), isA(Path.class)); + // metrics should record a deleted file + verify(metrics).reportAFileDelete(); + verify(metrics, never()).reportAFileProcess(); + } + + private CleanerTask createSpiedTask(FileSystem fs, AppChecker appChecker, + SCMStore store, SCMContext context, CleanerMetrics metrics, + AtomicBoolean isCleanerRunning) { + return spy(new CleanerTask(ROOT, STALENESS_MINUTES, SLEEP_TIME, + NESTED_LEVEL, fs, appChecker, store, context, metrics, + isCleanerRunning, true)); + } + + @Test + public void testEntryIsInUseEntryIsRefreshed() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + SCMContext context = mock(SCMContext.class); + + // have store say the entry does not exist + when(store.getAccessTime(isA(String.class))).thenReturn(-1L); + // have the filesystem return a fresh modification timestamp + FileStatus status = mock(FileStatus.class); + when(status.getModificationTime()).thenReturn(System.currentTimeMillis()); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + + // call entryIsInUse() + assertFalse(task.entryIsStale("fooKey", status)); + } + + @Test + public void testEntryIsInUseHasAnActiveApp() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + SCMContext context = mock(SCMContext.class); + + long staleModificationTime = + System.currentTimeMillis() - + TimeUnit.MINUTES.toMillis(STALENESS_MINUTES + 1); + FileStatus entry = mock(FileStatus.class); + when(entry.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + when(entry.getModificationTime()).thenReturn(staleModificationTime); + when(store.getAccessTime(isA(String.class))).thenReturn(-1L); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + + when(store.removeKey(isA(String.class))).thenReturn(false); + + // process the entry + task.processSingleEntry(entry); + + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } + + @Test + public void testProcessInitialActiveApps() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + + // prepare an initial active app list + List ids = new ArrayList(); + // app id for which app checker will return active + ApplicationId id1 = mock(ApplicationId.class); + when(appChecker.appIsActive(id1)).thenReturn(true); + ids.add(id1); + // app id for which app checker will return inactive + ApplicationId id2 = mock(ApplicationId.class); + when(appChecker.appIsActive(id2)).thenReturn(false); + ids.add(id2); + SCMContext context = mock(SCMContext.class); + when(context.getInitialActiveApps()).thenReturn(ids); + // create the store + SCMStore store = new InMemorySCMStore(context); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, context, metrics, + new AtomicBoolean()); + when(fs.exists(task.getRootPath())).thenReturn(true); + + task.run(); + + // process() should not be called + verify(task, never()).process(); + // the initial active app list should be down to 1 + assertSame(1, ids.size()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java new file mode 100644 index 0000000..ddada63 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +public class TestCleanerMetrics { + + Configuration conf = new Configuration(); + long currTime = System.currentTimeMillis(); + // it is float to allow floating point rate calculation + float lastDuration = 0; + float totalDurations = 0; + CleanerMetrics cleanerMetrics; + + @Before + public void init() { + CleanerMetrics.initSingleton(conf); + cleanerMetrics = CleanerMetrics.getInstance(); + } + + @Test + public void testMetricsOverMultiplePeriods() { + simulateACleanerRun(); + assertMetrics(4, 4, 1, 1); + currTime += 1300; + simulateACleanerRun(); + assertMetrics(4, 8, 1, 2); + } + + public void simulateACleanerRun() { + long startTime = currTime; + cleanerMetrics.reportCleaningStart(currTime); + currTime += 9; + cleanerMetrics.reportAFileProcess(currTime); + cleanerMetrics.reportAFileDelete(currTime); + currTime++; + cleanerMetrics.reportAFileProcess(currTime); + cleanerMetrics.reportAFileProcess(currTime); + lastDuration = currTime - startTime; + totalDurations += lastDuration; + } + + void assertMetrics(int proc, int totalProc, int del, int totalDel) { + Assert.assertEquals( + "Processed files in the last period are not measured correctly", proc, + cleanerMetrics.getProcessedFiles()); + Assert.assertEquals("Total processed files are not measured correctly", + totalProc, cleanerMetrics.getTotalProcessedFiles()); + Assert.assertEquals( + "Deleted files in the last period are not measured correctly", del, + cleanerMetrics.getDeletedFiles()); + Assert.assertEquals("Total deleted files are not measured correctly", + totalDel, cleanerMetrics.getTotalDeletedFiles()); + + Assert + .assertEquals( + "Rate of processed files in the last period are not measured correctly", + CleanerMetrics.ratePerMsToHour(proc / lastDuration), + cleanerMetrics.getProcessRate()); + Assert.assertEquals( + "Rate of total processed files are not measured correctly", + CleanerMetrics.ratePerMsToHour(totalProc / totalDurations), + cleanerMetrics.getTotalProcessRate()); + Assert.assertEquals( + "Rate of deleted files in the last period are not measured correctly", + CleanerMetrics.ratePerMsToHour(del / lastDuration), + cleanerMetrics.getDeleteRate()); + Assert.assertEquals( + "Rate of total deleted files are not measured correctly", + CleanerMetrics.ratePerMsToHour(totalDel / totalDurations), + cleanerMetrics.getTotalDeleteRate()); + } +}