diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e303f..bccd79b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1385,25 +1385,54 @@ * the last reference exceeds the staleness period. This value is specified in * minutes. */ - public static final String IN_MEMORY_STALENESS_PERIOD = - IN_MEMORY_STORE_PREFIX + "staleness-period"; - public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60; + public static final String IN_MEMORY_STALENESS_PERIOD_MINS = + IN_MEMORY_STORE_PREFIX + "staleness-period-mins"; + public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS = + 7 * 24 * 60; /** * Initial delay before the in-memory store runs its first check to remove * dead initial applications. Specified in minutes. */ - public static final String IN_MEMORY_INITIAL_DELAY = - IN_MEMORY_STORE_PREFIX + "initial-delay"; - public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10; + public static final String IN_MEMORY_INITIAL_DELAY_MINS = + IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; + public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. */ - public static final String IN_MEMORY_CHECK_PERIOD = - IN_MEMORY_STORE_PREFIX + "check-period"; - public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60; + public static final String IN_MEMORY_CHECK_PERIOD_MINS = + IN_MEMORY_STORE_PREFIX + "check-period-mins"; + public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60; + + // SCM Cleaner service configuration + + private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX + + "cleaner."; + + /** + * The frequency at which a cleaner task runs. Specified in minutes. + */ + public static final String SCM_CLEANER_PERIOD_MINS = + SCM_CLEANER_PREFIX + "period-mins"; + public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60; + + /** + * Initial delay before the first cleaner task is scheduled. Specified in + * minutes. + */ + public static final String SCM_CLEANER_INITIAL_DELAY_MINS = + SCM_CLEANER_PREFIX + "initial-delay-mins"; + public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10; + + /** + * The time to sleep between processing each shared cache resource. Specified + * in milliseconds. + */ + public static final String SCM_CLEANER_RESOURCE_SLEEP_MS = + SCM_CLEANER_PREFIX + "resource-sleep-ms"; + public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L; //////////////////////////////// // Other Configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3c3d7e3..fb63d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1374,24 +1374,45 @@ A resource in the in-memory store is considered stale if the time since the last reference exceeds the staleness period. This value is specified in minutes. - yarn.sharedcache.store.in-memory.staleness-period + yarn.sharedcache.store.in-memory.staleness-period-mins 10080 Initial delay before the in-memory store runs its first check to remove dead initial applications. Specified in minutes. - yarn.sharedcache.store.in-memory.initial-delay + yarn.sharedcache.store.in-memory.initial-delay-mins 10 The frequency at which the in-memory store checks to remove dead initial applications. Specified in minutes. - yarn.sharedcache.store.in-memory.check-period + yarn.sharedcache.store.in-memory.check-period-mins 720 + + The frequency at which a cleaner task runs. + Specified in minutes. + yarn.sharedcache.cleaner.period-mins + 1440 + + + + Initial delay before the first cleaner task is scheduled. + Specified in minutes. + yarn.sharedcache.cleaner.initial-delay-mins + 10 + + + + The time to sleep between processing each shared cache + resource. Specified in milliseconds. + yarn.sharedcache.cleaner.resource-sleep-ms + 0 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java index 4b933ac..d3cf379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java @@ -78,4 +78,14 @@ public static String getCacheEntryPath(int cacheDepth, String cacheRoot, return sb.toString(); } + + @Private + public static String getCacheEntryGlobPattern(int depth) { + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < depth; i++) { + pattern.append("*/"); + } + pattern.append("*"); + return pattern.toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java new file mode 100644 index 0000000..13cf8f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * The cleaner service that maintains the shared cache area, and cleans up stale + * entries on a regular basis. + */ +@Private +@Evolving +public class CleanerService extends CompositeService { + /** + * The name of the global cleaner lock that the cleaner creates to indicate + * that a cleaning process is in progress. + */ + public static final String GLOBAL_CLEANER_PID = ".cleaner_pid"; + + private static final Log LOG = LogFactory.getLog(CleanerService.class); + + private Configuration conf; + private AppChecker appChecker; + private CleanerMetrics metrics; + private ScheduledExecutorService scheduledExecutor; + private final SCMStore store; + private final AtomicBoolean cleanerTaskRunning; + + public CleanerService(SCMStore store) { + super("CleanerService"); + this.store = store; + this.cleanerTaskRunning = new AtomicBoolean(false); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + appChecker = createAppCheckerService(conf); + addService(appChecker); + + // create a single threaded scheduler service that services the cleaner task + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(tf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (!writeGlobalCleanerPidFile()) { + throw new YarnException("The global cleaner pid file already exists! " + + "It appears there is another CleanerService running in the cluster"); + } + + this.metrics = CleanerMetrics.initSingleton(conf); + + // Start dependent services (i.e. AppChecker) + super.serviceStart(); + + Runnable task = + CleanerTask.create(conf, store, metrics, cleanerTaskRunning, true); + long periodInMinutes = getPeriod(conf); + scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf), + periodInMinutes, TimeUnit.MINUTES); + LOG.info("Scheduled the shared cache cleaner task to run every " + + periodInMinutes + " minutes."); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Shutting down the background thread."); + scheduledExecutor.shutdownNow(); + try { + if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.info("The background thread stopped."); + } else { + LOG.warn("Gave up waiting for the cleaner task to shutdown."); + } + } catch (InterruptedException e) { + LOG.warn("The cleaner service was interrupted while shutting down the task.", + e); + } + + removeGlobalCleanerPidFile(); + + super.serviceStop(); + } + + @VisibleForTesting + AppChecker createAppCheckerService(Configuration conf) { + return SharedCacheManager.createAppCheckerService(conf); + } + + /** + * If no other cleaner task is running, execute an on-demand cleaner task. + * + * @return true if the cleaner task was started, false if there was already a + * cleaner task running. + */ + protected boolean runCleanerTask() { + + if (!this.cleanerTaskRunning.compareAndSet(false, true)) { + LOG.warn("A cleaner task is already running. " + + "A new on-demand cleaner task will not be submitted."); + return false; + } + + Runnable task = + CleanerTask.create(conf, store, metrics, cleanerTaskRunning, false); + // this is a non-blocking call (it simply submits the task to the executor + // queue and returns) + this.scheduledExecutor.execute(task); + /* + * We return true if the task is accepted for execution by the executor. Two + * notes: 1. There is a small race here between a scheduled task and an + * on-demand task. If the scheduled task happens to start after we check/set + * cleanerTaskRunning, but before we call execute, we will get two tasks + * that run back to back. Luckily, since we have already set + * cleanerTaskRunning, the scheduled task will do nothing and the on-demand + * task will clean. 2. We do not need to set the cleanerTaskRunning boolean + * back to false because this will be done in the task itself. + */ + return true; + } + + /** + * To ensure there are not multiple instances of the SCM running on a given + * cluster, a global pid file is used. This file contains the hostname of the + * machine that owns the pid file. + * + * @return true if the pid file was written, false otherwise + * @throws IOException + */ + private boolean writeGlobalCleanerPidFile() throws YarnException { + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + try { + FileSystem fs = FileSystem.get(this.conf); + + if (fs.exists(pidPath)) { + return false; + } + + FSDataOutputStream os = fs.create(pidPath, false); + // write the hostname and the process id in the global cleaner pid file + final String ID = ManagementFactory.getRuntimeMXBean().getName(); + os.writeUTF(ID); + os.close(); + // add it to the delete-on-exit to ensure it gets deleted when the JVM + // exits + fs.deleteOnExit(pidPath); + } catch (IOException e) { + throw new YarnException(e); + } + LOG.info("Created the global cleaner pid file at " + pidPath.toString()); + return true; + } + + private void removeGlobalCleanerPidFile() { + try { + FileSystem fs = FileSystem.get(this.conf); + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + + + fs.delete(pidPath, false); + LOG.info("Removed the global cleaner pid file at " + pidPath.toString()); + } catch (IOException e) { + LOG.error( + "Unable to remove the global cleaner pid file! The file may need " + + "to be removed manually.", e); + } + } + + private static int getInitialDelay(Configuration conf) { + int initialDelayInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS, + YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS); + // negative value is invalid; use the default + if (initialDelayInMinutes < 0) { + throw new HadoopIllegalArgumentException("Negative initial delay value: " + + initialDelayInMinutes + + ". The initial delay must be greater than zero."); + } + return initialDelayInMinutes; + } + + private static int getPeriod(Configuration conf) { + int periodInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS, + YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS); + // non-positive value is invalid; use the default + if (periodInMinutes <= 0) { + throw new HadoopIllegalArgumentException("Non-positive period value: " + + periodInMinutes + + ". The cleaner period must be greater than or equal to zero."); + } + return periodInMinutes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java new file mode 100644 index 0000000..9cee90a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * The task that runs and cleans up the shared cache area for stale entries and + * orphaned files. It is expected that only one cleaner task runs at any given + * point in time. + */ +@Private +@Evolving +class CleanerTask implements Runnable { + private static final String RENAMED_SUFFIX = "-renamed"; + private static final Log LOG = LogFactory.getLog(CleanerTask.class); + + private final String location; + private final long sleepTime; + private final int nestedLevel; + private final Path root; + private final FileSystem fs; + private final SCMStore store; + private final CleanerMetrics metrics; + private final AtomicBoolean cleanerTaskIsRunning; + private final boolean isScheduledTask; + + /** + * Creates a cleaner task based on the configuration. This is provided for + * convenience. + * + * @param conf + * @param store + * @param metrics + * @param cleanerTaskRunning true if there is another cleaner task currently + * running + * @param isScheduledTask true if the task is a scheduled task + * @return an instance of a CleanerTask + */ + public static CleanerTask create(Configuration conf, SCMStore store, + CleanerMetrics metrics, AtomicBoolean cleanerTaskRunning, + boolean isScheduledTask) { + try { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + long sleepTime = + conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS, + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS); + int nestedLevel = SharedCacheUtil.getCacheDepth(conf); + FileSystem fs = FileSystem.get(conf); + + return new CleanerTask(location, sleepTime, nestedLevel, fs, store, + metrics, cleanerTaskRunning, isScheduledTask); + } catch (IOException e) { + LOG.error("Unable to obtain the filesystem for the cleaner service", e); + throw new ExceptionInInitializerError(e); + } + } + + /** + * Creates a cleaner task based on the root directory location and the + * filesystem. + */ + CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs, + SCMStore store, CleanerMetrics metrics, + AtomicBoolean cleanerTaskIsRunning, boolean isScheduledTask) { + this.location = location; + this.sleepTime = sleepTime; + this.nestedLevel = nestedLevel; + this.root = new Path(location); + this.fs = fs; + this.store = store; + this.metrics = metrics; + this.cleanerTaskIsRunning = cleanerTaskIsRunning; + this.isScheduledTask = isScheduledTask; + } + + @Override + public void run() { + // check if it is a scheduled task + if (isScheduledTask + && !this.cleanerTaskIsRunning.compareAndSet(false, true)) { + // this is a scheduled task and there is already another task running + LOG.warn("A cleaner task is already running. " + + "This scheduled cleaner task will do nothing."); + return; + } + + try { + if (!fs.exists(root)) { + LOG.error("The shared cache root " + location + " was not found. " + + "The cleaner task will do nothing."); + return; + } + + // we're now ready to process the shared cache area + process(); + } catch (Throwable e) { + LOG.error("Unexpected exception while initializing the cleaner task. " + + "This task will do nothing,", e); + } finally { + // this is set to false regardless of if it is a scheduled or on-demand + // task + this.cleanerTaskIsRunning.set(false); + } + } + + /** + * Sweeps and processes the shared cache area to clean up stale and orphaned + * files. + */ + void process() { + // mark the beginning of the run in the metrics + metrics.reportCleaningStart(); + try { + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d/) + String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel); + FileStatus[] resources = + fs.globStatus(new Path(root, pattern)); + int numResources = resources == null ? 0 : resources.length; + LOG.info("Processing " + numResources + " resources in the shared cache"); + long beginMs = System.currentTimeMillis(); + if (resources != null) { + for (FileStatus resource : resources) { + // check for interruption so it can abort in a timely manner in case + // of shutdown + if (Thread.currentThread().isInterrupted()) { + LOG.warn("The cleaner task was interrupted. Aborting."); + break; + } + + if (resource.isDirectory()) { + processSingleResource(resource); + } else { + LOG.warn("Invalid file at path " + resource.getPath().toString() + + + " when a directory was expected"); + } + // add sleep time between cleaning each directory if it is non-zero + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } + } + long endMs = System.currentTimeMillis(); + long durationMs = endMs - beginMs; + LOG.info("Processed " + numResources + " resource(s) in " + durationMs + + " ms."); + } catch (IOException e1) { + LOG.error("Unable to complete the cleaner task", e1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); // restore the interrupt + } + } + + /** + * Returns a path for the root directory for the shared cache. + */ + Path getRootPath() { + return root; + } + + /** + * Processes a single shared cache resource directory. + */ + void processSingleResource(FileStatus resource) { + Path path = resource.getPath(); + // indicates the processing status of the resource + ResourceStatus resourceStatus = ResourceStatus.INIT; + + // first, if the path ends with the renamed suffix, it indicates the + // directory was moved (as stale) but somehow not deleted (probably due to + // SCM failure); delete the directory + if (path.toString().endsWith(RENAMED_SUFFIX)) { + LOG.info("Found a renamed directory that was left undeleted at " + + path.toString() + ". Deleting."); + try { + if (fs.delete(path, true)) { + resourceStatus = ResourceStatus.DELETED; + } + } catch (IOException e) { + LOG.error("Error while processing a shared cache resource: " + path, e); + } + } else { + // this is the path to the cache resource directory + // the directory name is the resource key (i.e. a unique identifier) + String key = path.getName(); + + try { + store.cleanResourceReferences(key); + } catch (YarnException e) { + LOG.error("Exception thrown while removing dead appIds.", e); + } + + if (store.isResourceEvictable(key, resource)) { + try { + /* + * TODO See YARN-2663: There is a race condition between + * store.removeResource(key) and + * removeResourceFromCacheFileSystem(path) operations because they do + * not happen atomically and resources can be uploaded with different + * file names by the node managers. + */ + // remove the resource from scm (checks for appIds as well) + if (store.removeResource(key)) { + // remove the resource from the file system + boolean deleted = removeResourceFromCacheFileSystem(path); + if (deleted) { + resourceStatus = ResourceStatus.DELETED; + } else { + LOG.error("Failed to remove path from the file system." + + " Skipping this resource: " + path); + resourceStatus = ResourceStatus.ERROR; + } + } else { + // we did not delete the resource because it contained application + // ids + resourceStatus = ResourceStatus.PROCESSED; + } + } catch (IOException e) { + LOG.error( + "Failed to remove path from the file system. Skipping this resource: " + + path, e); + resourceStatus = ResourceStatus.ERROR; + } + } else { + resourceStatus = ResourceStatus.PROCESSED; + } + } + + // record the processing + switch (resourceStatus) { + case DELETED: + metrics.reportAFileDelete(); + break; + case PROCESSED: + metrics.reportAFileProcess(); + break; + case ERROR: + metrics.reportAFileError(); + break; + default: + LOG.error("Cleaner encountered an invalid status (" + resourceStatus + + ") while processing resource: " + path.getName()); + } + } + + private boolean removeResourceFromCacheFileSystem(Path path) + throws IOException { + // rename the directory to make the delete atomic + Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX); + if (fs.rename(path, renamedPath)) { + // the directory can be removed safely now + // log the original path + LOG.info("Deleting " + path.toString()); + return fs.delete(renamedPath, true); + } else { + // we were unable to remove it for some reason: it's best to leave + // it at that + LOG.error("We were not able to rename the directory to " + + renamedPath.toString() + ". We will leave it intact."); + } + return false; + } + + /** + * A status indicating what happened with the processing of a given cache + * resource. + */ + private enum ResourceStatus { + INIT, + /** Resource was successfully processed, but not deleted **/ + PROCESSED, + /** Resource was successfully deleted **/ + DELETED, + /** The cleaner task ran into an error while processing the resource **/ + ERROR; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 2f3ddb1..3fdb588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -64,6 +64,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.store = createSCMStoreService(conf); addService(store); + CleanerService cs = createCleanerService(store); + addService(cs); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -90,6 +93,10 @@ private static SCMStore createSCMStoreService(Configuration conf) { return store; } + private CleanerService createCleanerService(SCMStore store) { + return new CleanerService(store); + } + @Override protected void serviceStop() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java new file mode 100644 index 0000000..5c8ea3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsAnnotations; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * This class is for maintaining the various Cleaner activity statistics and + * publishing them through the metrics interfaces. + */ +@Private +@Evolving +@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn") +public class CleanerMetrics { + public static final Log LOG = LogFactory.getLog(CleanerMetrics.class); + private final MetricsRegistry registry = new MetricsRegistry("cleaner"); + + enum Singleton { + INSTANCE; + + CleanerMetrics impl; + + synchronized CleanerMetrics init(Configuration conf) { + if (impl == null) { + impl = create(conf); + } + return impl; + } + } + + public static CleanerMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static CleanerMetrics getInstance() { + CleanerMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The CleanerMetics singlton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + @Metric("number of deleted files over all runs") + private MutableCounterLong totalDeletedFiles; + + public long getTotalDeletedFiles() { + return totalDeletedFiles.value(); + } + + private @Metric("number of deleted files in the last run") + MutableGaugeLong deletedFiles; + + public long getDeletedFiles() { + return deletedFiles.value(); + } + + private @Metric("number of processed files over all runs") + MutableCounterLong totalProcessedFiles; + + public long getTotalProcessedFiles() { + return totalProcessedFiles.value(); + } + + private @Metric("number of processed files in the last run") + MutableGaugeLong processedFiles; + + public long getProcessedFiles() { + return processedFiles.value(); + } + + @Metric("number of file errors over all runs") + private MutableCounterLong totalFileErrors; + + public long getTotalFileErrors() { + return totalFileErrors.value(); + } + + private @Metric("number of file errors in the last run") + MutableGaugeLong fileErrors; + + public long getFileErrors() { + return fileErrors.value(); + } + + private CleanerMetrics() { + } + + /** + * The metric source obtained after parsing the annotations + */ + MetricsSource metricSource; + + static CleanerMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + CleanerMetrics metricObject = new CleanerMetrics(); + MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject); + final MetricsSource s = sb.build(); + ms.register("cleaner", "The cleaner service of truly shared cache", s); + metricObject.metricSource = s; + return metricObject; + } + + /** + * Report a delete operation at the current system time + */ + public void reportAFileDelete() { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalDeletedFiles.incr(); + deletedFiles.incr(); + } + + /** + * Report a process operation at the current system time + */ + public void reportAFileProcess() { + totalProcessedFiles.incr(); + processedFiles.incr(); + } + + /** + * Report a process operation error at the current system time + */ + public void reportAFileError() { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalFileErrors.incr(); + fileErrors.incr(); + } + + /** + * Report the start a new run of the cleaner. + * + */ + public void reportCleaningStart() { + processedFiles.set(0); + deletedFiles.set(0); + fileErrors.set(0); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java index 79369d8..b8fe14f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; -import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -83,13 +82,12 @@ private final Object initialAppsLock = new Object(); private long startTime; private int stalenessMinutes; - private AppChecker appChecker; private ScheduledExecutorService scheduler; private int initialDelayMin; private int checkPeriodMin; - public InMemorySCMStore() { - super(InMemorySCMStore.class.getName()); + public InMemorySCMStore(AppChecker appChecker) { + super(InMemorySCMStore.class.getName(), appChecker); } private String intern(String key) { @@ -108,9 +106,6 @@ protected void serviceInit(Configuration conf) throws Exception { this.checkPeriodMin = getCheckPeriod(conf); this.stalenessMinutes = getStalenessPeriod(conf); - appChecker = createAppCheckerService(conf); - addService(appChecker); - bootstrap(conf); ThreadFactory tf = @@ -157,11 +152,6 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - @VisibleForTesting - AppChecker createAppCheckerService(Configuration conf) { - return SharedCacheManager.createAppCheckerService(conf); - } - private void bootstrap(Configuration conf) throws IOException { Map initialCachedResources = getInitialCachedResources(FileSystem.get(conf), conf); @@ -201,14 +191,10 @@ private void bootstrap(Configuration conf) throws IOException { // now traverse individual directories and process them // the directory structure is specified by the nested level parameter // (e.g. 9/c/d//file) - StringBuilder pattern = new StringBuilder(); - for (int i = 0; i < nestedLevel + 1; i++) { - pattern.append("*/"); - } - pattern.append("*"); + String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1); LOG.info("Querying for all individual cached resource files"); - FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString())); + FileStatus[] entries = fs.globStatus(new Path(root, pattern)); int numEntries = entries == null ? 0 : entries.length; LOG.info("Found " + numEntries + " files: processing for one resource per " + "key"); @@ -360,6 +346,17 @@ public void removeResourceReferences(String key, } /** + * Provides atomicity for the method. + */ + @Override + public void cleanResourceReferences(String key) throws YarnException { + String interned = intern(key); + synchronized (interned) { + super.cleanResourceReferences(key); + } + } + + /** * Removes the given resource from the store. Returns true if the resource is * found and removed or if the resource is not found. Returns false if it was * unable to remove the resource because the resource reference list was not @@ -427,8 +424,8 @@ public boolean isResourceEvictable(String key, FileStatus file) { private static int getStalenessPeriod(Configuration conf) { int stalenessMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD, - YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD); + conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS); // non-positive value is invalid; use the default if (stalenessMinutes <= 0) { throw new HadoopIllegalArgumentException("Non-positive staleness value: " @@ -440,8 +437,8 @@ private static int getStalenessPeriod(Configuration conf) { private static int getInitialDelay(Configuration conf) { int initialMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY, - YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY); + conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS); // non-positive value is invalid; use the default if (initialMinutes <= 0) { throw new HadoopIllegalArgumentException( @@ -453,8 +450,8 @@ private static int getInitialDelay(Configuration conf) { private static int getCheckPeriod(Configuration conf) { int checkMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD, - YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD); + conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS); // non-positive value is invalid; use the default if (checkMinutes <= 0) { throw new HadoopIllegalArgumentException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java index 397d904..6be00b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java @@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.server.sharedcachemanager.store; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; /** @@ -35,8 +39,11 @@ @Evolving public abstract class SCMStore extends CompositeService { - protected SCMStore(String name) { + protected final AppChecker appChecker; + + protected SCMStore(String name, AppChecker appChecker) { super(name); + this.appChecker = appChecker; } /** @@ -119,6 +126,33 @@ public abstract void removeResourceReferences(String key, Collection refs, boolean updateAccessTime); /** + * Clean all resource references to a cache resource that contain application + * ids pointing to finished applications. If the resource key does not exist, + * do nothing. + * + * @param key a unique identifier for a resource + * @throws YarnException + */ + @Private + public void cleanResourceReferences(String key) throws YarnException { + Collection refs = getResourceReferences(key); + if (!refs.isEmpty()) { + Set refsToRemove = + new HashSet(); + for (SharedCacheResourceReference r : refs) { + if (!appChecker.isApplicationActive(r.getAppId())) { + // application in resource reference is dead, it is safe to remove the + // reference + refsToRemove.add(r); + } + } + if (refsToRemove.size() > 0) { + removeResourceReferences(key, refsToRemove, false); + } + } + } + + /** * Check if a specific resource is evictable according to the store's enabled * cache eviction policies. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java new file mode 100644 index 0000000..0e5c957 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.Test; + +public class TestCleanerTask { + private static final String ROOT = + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT; + private static final long SLEEP_TIME = + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS; + private static final int NESTED_LEVEL = + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + + @Test + public void testNonExistentRoot() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + // the shared cache root does not exist + when(fs.exists(task.getRootPath())).thenReturn(false); + + task.run(); + + // process() should not be called + verify(task, never()).process(); + } + + @Test + public void testProcessFreshResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // mock a resource that is not evictable + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(false); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + + // process the resource + task.processSingleResource(status); + + // the directory should not be renamed + verify(fs, never()).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } + + @Test + public void testProcessEvictableResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + when(appChecker.isApplicationActive(isA(ApplicationId.class))).thenReturn( + false); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // mock an evictable resource + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + when(store.removeResource(isA(String.class))).thenReturn(true); + // rename succeeds + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + // delete returns true + when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true); + + // process the resource + task.processSingleResource(status); + + // the directory should be renamed + verify(fs).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a deleted file + verify(metrics).reportAFileDelete(); + verify(metrics, never()).reportAFileProcess(); + } + + private CleanerTask createSpiedTask(FileSystem fs, AppChecker appChecker, + SCMStore store, CleanerMetrics metrics, AtomicBoolean isCleanerRunning) { + return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store, + metrics, isCleanerRunning, true)); + } + + @Test + public void testResourceIsInUseHasAnActiveApp() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + FileStatus resource = mock(FileStatus.class); + when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + // resource is stale + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + // but still has appIds + when(store.removeResource(isA(String.class))).thenReturn(false); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // process the resource + task.processSingleResource(resource); + + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java new file mode 100644 index 0000000..26ab179 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +public class TestCleanerMetrics { + + Configuration conf = new Configuration(); + CleanerMetrics cleanerMetrics; + + @Before + public void init() { + CleanerMetrics.initSingleton(conf); + cleanerMetrics = CleanerMetrics.getInstance(); + } + + @Test + public void testMetricsOverMultiplePeriods() { + simulateACleanerRun(); + assertMetrics(4, 4, 1, 1); + simulateACleanerRun(); + assertMetrics(4, 8, 1, 2); + } + + public void simulateACleanerRun() { + cleanerMetrics.reportCleaningStart(); + cleanerMetrics.reportAFileProcess(); + cleanerMetrics.reportAFileDelete(); + cleanerMetrics.reportAFileProcess(); + cleanerMetrics.reportAFileProcess(); + } + + void assertMetrics(int proc, int totalProc, int del, int totalDel) { + assertEquals( + "Processed files in the last period are not measured correctly", proc, + cleanerMetrics.getProcessedFiles()); + assertEquals("Total processed files are not measured correctly", + totalProc, cleanerMetrics.getTotalProcessedFiles()); + assertEquals( + "Deleted files in the last period are not measured correctly", del, + cleanerMetrics.getDeletedFiles()); + assertEquals("Total deleted files are not measured correctly", + totalDel, cleanerMetrics.getTotalDeletedFiles()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java index 891703e..831ef6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -60,10 +60,8 @@ @Before public void setup() { - this.store = spy(new InMemorySCMStore()); this.checker = spy(new DummyAppChecker()); - doReturn(checker).when(store).createAppCheckerService( - isA(Configuration.class)); + this.store = spy(new InMemorySCMStore(checker)); } @After