diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f183a90..43c6325 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1374,6 +1374,34 @@ public static final String IN_MEMORY_CHECK_PERIOD = IN_MEMORY_STORE_PREFIX + "check-period"; public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60; + + // SCM Cleaner service configuration + + private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX + + "cleaner."; + + /** + * The frequency at which a cleaner task runs. Specified in minutes. + */ + public static final String SCM_CLEANER_PERIOD = + SCM_CLEANER_PREFIX + "period"; + public static final int DEFAULT_SCM_CLEANER_PERIOD = 24 * 60; + + /** + * Initial delay before the first cleaner task is scheduled. Specified in + * minutes. + */ + public static final String SCM_CLEANER_INITIAL_DELAY = + SCM_CLEANER_PREFIX + "initial-delay"; + public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY = 10; + + /** + * The time to sleep between processing each shared cache resource. Specified + * in milliseconds. + */ + public static final String SCM_CLEANER_RESOURCE_SLEEP = + SCM_CLEANER_PREFIX + "resource-sleep"; + public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP = 0L; //////////////////////////////// // Other Configs diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5d53191..c24e731 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1376,6 +1376,27 @@ 720 + + The frequency at which a cleaner task runs. + Specified in minutes. + yarn.sharedcache.cleaner.period + 1440 + + + + Initial delay before the first cleaner task is scheduled. + Specified in minutes. + yarn.sharedcache.cleaner.initial-delay + 10 + + + + The time to sleep between processing each shared cache + resource. Specified in milliseconds. + yarn.sharedcache.cleaner.resource-sleep + 0 + + The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java new file mode 100644 index 0000000..96a89ee --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * The cleaner service that maintains the shared cache area, and cleans up stale + * entries on a regular basis. + */ +@Private +@Evolving +public class CleanerService extends CompositeService { + /** + * Priority of the cleaner shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + /** + * The name of the global cleaner lock that the cleaner creates to indicate + * that a cleaning process is in progress. + */ + public static final String GLOBAL_CLEANER_PID = ".cleaner_pid"; + + private static final Log LOG = LogFactory.getLog(CleanerService.class); + + private Configuration conf; + private AppChecker appChecker; + private CleanerMetrics metrics; + private ScheduledExecutorService scheduler; + private final SCMStore store; + private final AtomicBoolean cleanerTaskRunning; + + public CleanerService(SCMStore store) { + super("CleanerService"); + this.store = store; + this.cleanerTaskRunning = new AtomicBoolean(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + appChecker = createAppCheckerService(conf); + addService(appChecker); + + // create a single threaded scheduler service that services the cleaner task + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); + scheduler = Executors.newSingleThreadScheduledExecutor(tf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (!writeGlobalCleanerPidFile()) { + throw new YarnException("The global cleaner pid file already exists!"); + } + + this.metrics = CleanerMetrics.initSingleton(conf); + + // Start dependent services (i.e. AppChecker) + super.serviceStart(); + + Runnable task = + CleanerTask.create(conf, appChecker, store, metrics, + cleanerTaskRunning, true); + long periodInMinutes = getPeriod(conf); + scheduler.scheduleAtFixedRate(task, getInitialDelay(conf), periodInMinutes, + TimeUnit.MINUTES); + LOG.info("Scheduled the shared cache cleaner task to run every " + + periodInMinutes + " minutes."); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Shutting down the background thread."); + scheduler.shutdownNow(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Gave up waiting for the cleaner task to shutdown."); + } + } catch (InterruptedException e) { + LOG.warn("The cleaner service was interrupted while shutting down the task.", + e); + } + LOG.info("The background thread stopped."); + + removeGlobalCleanerPidFile(); + + super.serviceStop(); + } + + @VisibleForTesting + AppChecker createAppCheckerService(Configuration conf) { + return SharedCacheManager.createAppCheckerService(conf); + } + + /** + * If no other cleaner task is running, execute an on-demand cleaner task. + * + * @return true if the cleaner task was started, false if there was already a + * cleaner task running. + */ + protected boolean runCleanerTask() { + + if (!this.cleanerTaskRunning.compareAndSet(false, true)) { + LOG.warn("A cleaner task is already running. " + + "A new on-demand cleaner task will not be submitted."); + return false; + } + + Runnable task = + CleanerTask.create(conf, appChecker, store, metrics, + cleanerTaskRunning, false); + // this is a non-blocking call (it simply submits the task to the executor + // queue and returns) + this.scheduler.execute(task); + /* + * We return true if the task is accepted for execution by the executor. Two + * notes: 1. There is a small race here between a scheduled task and an + * on-demand task. If the scheduled task happens to start after we check/set + * cleanerTaskRunning, but before we call execute, we will get two tasks + * that run back to back. Luckily, since we have already set + * cleanerTaskRunning, the scheduled task will do nothing and the on-demand + * task will clean. 2. We do not need to set the cleanerTaskRunning boolean + * back to false because this will be done in the task itself. + */ + return true; + } + + /** + * To ensure there are not multiple instances of the SCM running on a given + * cluster, a global pid file is used. This file contains the hostname of the + * machine that owns the pid file. + * + * @return true if the pid file was written, false otherwise + * @throws IOException + */ + private boolean writeGlobalCleanerPidFile() throws YarnException { + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + try { + FileSystem fs = FileSystem.get(this.conf); + + if (fs.exists(pidPath)) { + return false; + } + + FSDataOutputStream os = fs.create(pidPath, false); + // write the hostname and the process id in the global cleaner pid file + final String ID = ManagementFactory.getRuntimeMXBean().getName(); + os.writeUTF(ID); + os.close(); + // add it to the delete-on-exit to ensure it gets deleted when the JVM + // exits + fs.deleteOnExit(pidPath); + } catch (IOException e) { + throw new YarnException(e); + } + LOG.info("Created the global cleaner pid file at " + pidPath.toString()); + return true; + } + + private void removeGlobalCleanerPidFile() { + try { + FileSystem fs = FileSystem.get(this.conf); + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + + + fs.delete(pidPath, false); + LOG.info("Removed the global cleaner pid file at " + pidPath.toString()); + } catch (IOException e) { + LOG.error( + "Unable to remove the global cleaner pid file! The file may need " + + "to be removed manually.", e); + } + } + + private static int getInitialDelay(Configuration conf) { + int initialDelayInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY, + YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY); + // negative value is invalid; use the default + if (initialDelayInMinutes < 0) { + throw new HadoopIllegalArgumentException("Negative initial delay value: " + + initialDelayInMinutes + + ". The initial delay must be greater than zero."); + } + return initialDelayInMinutes; + } + + private static int getPeriod(Configuration conf) { + int periodInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD, + YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD); + // non-positive value is invalid; use the default + if (periodInMinutes <= 0) { + throw new HadoopIllegalArgumentException("Non-positive period value: " + + periodInMinutes + + ". The cleaner period must be greater than or equal to zero."); + } + return periodInMinutes; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java new file mode 100644 index 0000000..c6cce01 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; + +/** + * The task that runs and cleans up the shared cache area for stale entries and + * orphaned files. It is expected that only one cleaner task runs at any given + * point in time. + */ +@Private +@Evolving +class CleanerTask implements Runnable { + public static final String RENAMED_SUFFIX = "-renamed"; + private static final Log LOG = LogFactory.getLog(CleanerTask.class); + + private final String location; + private final long sleepTime; + private final int nestedLevel; + private final Path root; + private final FileSystem fs; + private final AppChecker appChecker; + private final SCMStore store; + private final CleanerMetrics metrics; + private final AtomicBoolean cleanerTaskIsRunning; + private final boolean isScheduledTask; + + /** + * Creates a cleaner task based on the configuration. This is provided for + * convenience. + * + * @param conf + * @param appChecker + * @param store + * @param metrics + * @param cleanerTaskRunning true if there is another cleaner task currently + * running + * @param isScheduledTask true if the task is a scheduled task + * @return an instance of a CleanerTask + */ + public static CleanerTask create(Configuration conf, AppChecker appChecker, + SCMStore store, CleanerMetrics metrics, AtomicBoolean cleanerTaskRunning, + boolean isScheduledTask) { + try { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + long sleepTime = + conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP, + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP); + int nestedLevel = SharedCacheUtil.getCacheDepth(conf); + FileSystem fs = FileSystem.get(conf); + + return new CleanerTask(location, sleepTime, nestedLevel, fs, appChecker, + store, metrics, cleanerTaskRunning, isScheduledTask); + } catch (IOException e) { + LOG.error("Unable to obtain the filesystem for the cleaner service", e); + throw new ExceptionInInitializerError(e); + } + } + + /** + * Creates a cleaner task based on the root directory location and the + * filesystem. + */ + CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs, + AppChecker appChecker, SCMStore store, CleanerMetrics metrics, + AtomicBoolean cleanerTaskIsRunning, boolean isScheduledTask) { + this.location = location; + this.sleepTime = sleepTime; + this.nestedLevel = nestedLevel; + this.root = new Path(location); + this.fs = fs; + this.store = store; + this.appChecker = appChecker; + this.metrics = metrics; + this.cleanerTaskIsRunning = cleanerTaskIsRunning; + this.isScheduledTask = isScheduledTask; + } + + public void run() { + // check if it is a scheduled task + if (isScheduledTask + && !this.cleanerTaskIsRunning.compareAndSet(false, true)) { + // this is a scheduled task and there is already another task running + LOG.warn("A cleaner task is already running. " + + "This scheduled cleaner task will do nothing."); + return; + } + + try { + if (!fs.exists(root)) { + LOG.error("The shared cache root " + location + " was not found. " + + "The cleaner task will do nothing."); + return; + } + + // we're now ready to process the shared cache area + process(); + } catch (IOException e) { + LOG.error("Unexpected exception while initializing the cleaner task. " + + "This task will do nothing,", e); + } finally { + // this is set to false regardless of if it is a scheduled or on-demand + // task + this.cleanerTaskIsRunning.set(false); + } + } + + /** + * Sweeps and processes the shared cache area to clean up stale and orphaned + * files. + */ + void process() { + // mark the beginning of the run in the metrics + metrics.reportCleaningStart(); + try { + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d/) + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < nestedLevel; i++) { + pattern.append("*/"); + } + pattern.append("*"); + FileStatus[] resources = + fs.globStatus(new Path(root, pattern.toString())); + int numResources = resources == null ? 0 : resources.length; + LOG.info("Processing " + numResources + " resources in the shared cache"); + long beginNano = System.nanoTime(); + if (resources != null) { + for (FileStatus resource : resources) { + // check for interruption so it can abort in a timely manner in case + // of shutdown + if (Thread.currentThread().isInterrupted()) { + LOG.warn("The cleaner task was interrupted. Aborting."); + break; + } + + if (resource.isDirectory()) { + processSingleResource(resource); + } else { + LOG.warn("Invalid file at path " + resource.getPath().toString() + + + " when a directory was expected"); + } + // add sleep time between cleaning each directory if it is non-zero + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } + } + long endNano = System.nanoTime(); + long durationMs = TimeUnit.NANOSECONDS.toMillis(endNano - beginNano); + LOG.info("Processed " + numResources + " resource(s) in " + durationMs + + + " ms."); + } catch (IOException e1) { + LOG.error("Unable to complete the cleaner task", e1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); // restore the interrupt + } + } + + /** + * Returns a path for the root directory for the shared cache. + */ + Path getRootPath() { + return root; + } + + /** + * Processes a single shared cache resource directory. + */ + void processSingleResource(FileStatus resource) { + Path path = resource.getPath(); + // indicates the processing status of the resource + ResourceStatus resourceStatus = ResourceStatus.INIT; + + // first, if the path ends with the renamed suffix, it indicates the + // directory was moved (as stale) but somehow not deleted (probably due to + // SCM failure); delete the directory + if (path.toString().endsWith(RENAMED_SUFFIX)) { + LOG.info("Found a renamed directory that was left undeleted at " + + path.toString() + ". Deleting."); + try { + if (fs.delete(path, true)) { + resourceStatus = ResourceStatus.DELETED; + } + } catch (IOException e) { + LOG.error("Error while processing a shared cache resource: " + path, e); + } + } else { + // this is the path to the cache resource directory + // the directory name is the resource key (i.e. a unique identifier) + String key = path.getName(); + + try { + cleanResourceReferences(key); + } catch (YarnException e) { + LOG.error("Exception thrown while removing dead appIds.", e); + } + + if (store.isResourceEvictable(key, resource)) { + try { + /* + * TODO See YARN-2663: There is a race condition between + * store.removeResource(key) and + * removeResourceFromCacheFileSystem(path) operations because they do + * not happen atomically and resources can be uploaded with different + * file names by the node managers. + */ + // remove the resource from scm (checks for appIds as well) + if (store.removeResource(key)) { + // remove the resource from the file system + boolean deleted = removeResourceFromCacheFileSystem(path); + if (deleted) { + resourceStatus = ResourceStatus.DELETED; + } else { + LOG.error("Failed to remove path from the file system." + + " Skipping this resource: " + path); + resourceStatus = ResourceStatus.ERROR; + } + } else { + // we did not delete the resource because it contained application + // ids + resourceStatus = ResourceStatus.PROCESSED; + } + } catch (IOException e) { + LOG.error( + "Failed to remove path from the file system. Skipping this resource: " + + path, e); + resourceStatus = ResourceStatus.ERROR; + } + } else { + resourceStatus = ResourceStatus.PROCESSED; + } + } + + // record the processing + switch (resourceStatus) { + case DELETED: + metrics.reportAFileDelete(); + break; + case PROCESSED: + metrics.reportAFileProcess(); + break; + case ERROR: + metrics.reportAFileError(); + break; + default: + LOG.error("Cleaner encountered an invalid status (" + resourceStatus + + ") while processing resource: " + path.getName()); + } + } + + private boolean removeResourceFromCacheFileSystem(Path path) + throws IOException { + // rename the directory to make the delete atomic + Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX); + if (fs.rename(path, renamedPath)) { + // the directory can be removed safely now + // log the original path + LOG.info("Deleting " + path.toString()); + return fs.delete(renamedPath, true); + } else { + // we were unable to remove it for some reason: it's best to leave + // it at that + LOG.error("We were not able to rename the directory to " + + renamedPath.toString() + ". We will leave it intact."); + } + return false; + } + + /** + * Clean all resource references to a cache resource that contain application + * ids pointing to finished applications. If the resource key does not exist, + * do nothing. + * + * @param key a unique identifier for a resource + * @throws YarnException + */ + private void cleanResourceReferences(String key) throws YarnException { + Collection refs = + store.getResourceReferences(key); + if (!refs.isEmpty()) { + Set refsToRemove = + new HashSet(); + for (SharedCacheResourceReference r : refs) { + if (!appChecker.isApplicationActive(r.getAppId())) { + // application in resource reference is dead, it is safe to remove the + // reference + refsToRemove.add(r); + } + } + if (refsToRemove.size() > 0) { + store.removeResourceReferences(key, refsToRemove, false); + } + } + } + + /** + * A status indicating what happened with the processing of a given cache + * resource. + */ + private enum ResourceStatus { + INIT, + /** Resource was successfully processed, but not deleted **/ + PROCESSED, + /** Resource was successfully deleted **/ + DELETED, + /** The cleaner task ran into an error while processing the resource **/ + ERROR; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 2f3ddb1..3fdb588 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -64,6 +64,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.store = createSCMStoreService(conf); addService(store); + CleanerService cs = createCleanerService(store); + addService(cs); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -90,6 +93,10 @@ private static SCMStore createSCMStoreService(Configuration conf) { return store; } + private CleanerService createCleanerService(SCMStore store) { + return new CleanerService(store); + } + @Override protected void serviceStop() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java new file mode 100644 index 0000000..18f3363 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsAnnotations; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * This class is for maintaining the various Cleaner activity statistics and + * publishing them through the metrics interfaces. + */ +@Private +@Evolving +@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn") +public class CleanerMetrics { + public static final Log LOG = LogFactory.getLog(CleanerMetrics.class); + private final MetricsRegistry registry = new MetricsRegistry("cleaner"); + + enum Singleton { + INSTANCE; + + CleanerMetrics impl; + + synchronized CleanerMetrics init(Configuration conf) { + if (impl == null) { + impl = create(conf); + } + return impl; + } + } + + public static CleanerMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static CleanerMetrics getInstance() { + CleanerMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The CleanerMetics singlton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + @Metric("number of deleted files over all runs") + private MutableCounterLong totalDeletedFiles; + + public long getTotalDeletedFiles() { + return totalDeletedFiles.value(); + } + + private @Metric("number of deleted files in the last run") + MutableGaugeLong deletedFiles; + + public long getDeletedFiles() { + return deletedFiles.value(); + } + + private @Metric("number of processed files over all runs") + MutableCounterLong totalProcessedFiles; + + public long getTotalProcessedFiles() { + return totalProcessedFiles.value(); + } + + private @Metric("number of processed files in the last run") + MutableGaugeLong processedFiles; + + public long getProcessedFiles() { + return processedFiles.value(); + } + + @Metric("number of file errors over all runs") + private MutableCounterLong totalFileErrors; + + public long getTotalFileErrors() { + return totalFileErrors.value(); + } + + private @Metric("number of file errors in the last run") + MutableGaugeLong fileErrors; + + public long getFileErrors() { + return fileErrors.value(); + } + + private @Metric("Rate of deleting the files over all runs") + MutableGaugeInt totalDeleteRate; + + public int getTotalDeleteRate() { + return totalDeleteRate.value(); + } + + private @Metric("Rate of deleting the files over the last run") + MutableGaugeInt deleteRate; + + public int getDeleteRate() { + return deleteRate.value(); + } + + private @Metric("Rate of prcessing the files over all runs") + MutableGaugeInt totalProcessRate; + + public int getTotalProcessRate() { + return totalProcessRate.value(); + } + + private @Metric("Rate of prcessing the files over the last run") + MutableGaugeInt processRate; + + public int getProcessRate() { + return processRate.value(); + } + + private @Metric("Rate of file errors over all runs") + MutableGaugeInt totalErrorRate; + + public int getTotalErrorRate() { + return totalErrorRate.value(); + } + + private @Metric("Rate of file errors over the last run") + MutableGaugeInt errorRate; + + public int getErrorRate() { + return errorRate.value(); + } + + CleanerMetrics() { + } + + /** + * The metric source obtained after parsing the annotations + */ + MetricsSource metricSource; + + /** + * The start of the last run of cleaner is ms + */ + private AtomicLong lastRunStart = new AtomicLong(System.currentTimeMillis()); + + /** + * The end of the last run of cleaner is ms + */ + private AtomicLong lastRunEnd = new AtomicLong(System.currentTimeMillis()); + + /** + * The sum of the durations of the last runs + */ + private AtomicLong sumOfPastPeriods = new AtomicLong(0); + + public MetricsSource getMetricSource() { + return metricSource; + } + + static CleanerMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + CleanerMetrics metricObject = new CleanerMetrics(); + MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject); + final MetricsSource s = sb.build(); + ms.register("cleaner", "The cleaner service of truly shared cache", s); + metricObject.metricSource = s; + return metricObject; + } + + /** + * Report a delete operation at the current system time + */ + public void reportAFileDelete() { + long time = System.currentTimeMillis(); + reportAFileDelete(time); + } + + /** + * Report a delete operation at the specified time. + * Delete implies process as well. + * @param time + */ + public void reportAFileDelete(long time) { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalDeletedFiles.incr(); + deletedFiles.incr(); + updateRates(time); + lastRunEnd.set(time); + } + + /** + * Report a process operation at the current system time + */ + public void reportAFileProcess() { + long time = System.currentTimeMillis(); + reportAFileProcess(time); + } + + /** + * Report a process operation at the specified time + * + * @param time + */ + public void reportAFileProcess(long time) { + totalProcessedFiles.incr(); + processedFiles.incr(); + updateRates(time); + lastRunEnd.set(time); + } + + /** + * Report a process operation error at the current system time + */ + public void reportAFileError() { + long time = System.currentTimeMillis(); + reportAFileError(time); + } + + /** + * Report a process operation error at the specified time + * + * @param time + */ + public void reportAFileError(long time) { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalFileErrors.incr(); + fileErrors.incr(); + updateRates(time); + lastRunEnd.set(time); + } + + private void updateRates(long time) { + long startTime = lastRunStart.get(); + long lastPeriod = time - startTime; + long sumPeriods = sumOfPastPeriods.get() + lastPeriod; + float lastRunProcessRate = ((float) processedFiles.value()) / lastPeriod; + processRate.set(ratePerMsToHour(lastRunProcessRate)); + float totalProcessRateMs = ((float) totalProcessedFiles.value()) / sumPeriods; + totalProcessRate.set(ratePerMsToHour(totalProcessRateMs)); + float lastRunDeleteRate = ((float) deletedFiles.value()) / lastPeriod; + deleteRate.set(ratePerMsToHour(lastRunDeleteRate)); + float totalDeleteRateMs = ((float) totalDeletedFiles.value()) / sumPeriods; + totalDeleteRate.set(ratePerMsToHour(totalDeleteRateMs)); + float lastRunErrorRateMs = ((float) fileErrors.value()) / lastPeriod; + errorRate.set(ratePerMsToHour(lastRunErrorRateMs)); + float totalErrorRateMs = ((float) totalFileErrors.value()) / sumPeriods; + totalErrorRate.set(ratePerMsToHour(totalErrorRateMs)); + } + + /** + * Report the start a new run of the cleaner at the current system time + */ + public void reportCleaningStart() { + long time = System.currentTimeMillis(); + reportCleaningStart(time); + } + + /** + * Report the start a new run of the cleaner at the specified time + * + * @param time + */ + public void reportCleaningStart(long time) { + long lastPeriod = lastRunEnd.get() - lastRunStart.get(); + if (lastPeriod < 0) { + LOG.error("No operation since last start!"); + lastPeriod = 0; + } + lastRunStart.set(time); + processedFiles.set(0); + deletedFiles.set(0); + processRate.set(0); + deleteRate.set(0); + errorRate.set(0); + sumOfPastPeriods.addAndGet(lastPeriod); + registry.tag(SessionId, Long.toString(time), true); + } + + static int ratePerMsToHour(float ratePerMs) { + float ratePerHour = ratePerMs * 1000 * 3600; + return (int) ratePerHour; + } + + public static boolean isCleanerMetricRecord(String name) { + return (name.startsWith("cleaner")); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java new file mode 100644 index 0000000..d853922 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetricsCollector.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MsInfo; + +/** + * Very simple metric collector for the cleaner service + */ +@Private +@Evolving +public class CleanerMetricsCollector implements MetricsCollector { + + /** + * A map from reporting period to the list of reported metrics in that period + */ + Map metrics = new HashMap(); + String sessionId = null; + + @Override + public MetricsRecordBuilder addRecord(String name) { + // For the cleaner service we need to record only the metrics destined for the + // cleaner service. We hence ignore the others. + if (CleanerMetrics.isCleanerMetricRecord(name)) { + return new CleanerMetricsRecordBuilder(metrics); + } + else + return new NullMetricsRecordBuilder(); + } + + @Override + public MetricsRecordBuilder addRecord(MetricsInfo info) { + return addRecord(info.name()); + } + + public Map getMetrics() { + return metrics; + } + + public String getSessionId() { + return sessionId; + } + + /** + * A builder which ignores all the added metrics, tags, etc. + */ + class NullMetricsRecordBuilder extends MetricsRecordBuilder { + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return this; + } + + @Override + public MetricsCollector parent() { + return CleanerMetricsCollector.this; + } + } + + /** + * A builder which keeps track of only counters and gauges + */ + @Private + @Evolving + class CleanerMetricsRecordBuilder extends NullMetricsRecordBuilder { + Map metricValueMap; + + public CleanerMetricsRecordBuilder(Map metricValueMap) { + this.metricValueMap = metricValueMap; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + if (MsInfo.SessionId.equals(info)) + setSessionId(value); + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + if (MsInfo.SessionId.equals(tag.info())) + setSessionId(tag.value()); + return this; + } + + private void setSessionId(String value) { + sessionId = value; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + metricValueMap.put(metric.name(), metric.value()); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + metricValueMap.put(info.name(), value); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + metricValueMap.put(info.name(),value); + return this; + } + + @Override + public MetricsCollector parent() { + return CleanerMetricsCollector.this; + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java new file mode 100644 index 0000000..c910211 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.Test; + +public class TestCleanerTask { + private static final String ROOT = + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT; + private static final long SLEEP_TIME = + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP; + private static final int NESTED_LEVEL = + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + + @Test + public void testNonExistentRoot() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + // the shared cache root does not exist + when(fs.exists(task.getRootPath())).thenReturn(false); + + task.run(); + + // process() should not be called + verify(task, never()).process(); + } + + @Test + public void testProcessFreshResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // mock a resource that is not evictable + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(false); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + + // process the resource + task.processSingleResource(status); + + // the directory should not be renamed + verify(fs, never()).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } + + @Test + public void testProcessEvictableResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + when(appChecker.isApplicationActive(isA(ApplicationId.class))).thenReturn( + false); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // mock an evictable resource + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + when(store.removeResource(isA(String.class))).thenReturn(true); + // rename succeeds + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + // delete returns true + when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true); + + // process the resource + task.processSingleResource(status); + + // the directory should be renamed + verify(fs).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a deleted file + verify(metrics).reportAFileDelete(); + verify(metrics, never()).reportAFileProcess(); + } + + private CleanerTask createSpiedTask(FileSystem fs, AppChecker appChecker, + SCMStore store, CleanerMetrics metrics, AtomicBoolean isCleanerRunning) { + return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, appChecker, + store, metrics, isCleanerRunning, true)); + } + + @Test + public void testResourceIsInUseHasAnActiveApp() throws Exception { + FileSystem fs = mock(FileSystem.class); + AppChecker appChecker = mock(AppChecker.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + FileStatus resource = mock(FileStatus.class); + when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + // resource is stale + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + // but still has appIds + when(store.removeResource(isA(String.class))).thenReturn(false); + + CleanerTask task = + createSpiedTask(fs, appChecker, store, metrics, new AtomicBoolean()); + + // process the resource + task.processSingleResource(resource); + + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java new file mode 100644 index 0000000..0aaa3bf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +public class TestCleanerMetrics { + + Configuration conf = new Configuration(); + long currTime = System.currentTimeMillis(); + // it is float to allow floating point rate calculation + float lastDuration = 0; + float totalDurations = 0; + CleanerMetrics cleanerMetrics; + + @Before + public void init() { + CleanerMetrics.initSingleton(conf); + cleanerMetrics = CleanerMetrics.getInstance(); + } + + @Test + public void testMetricsOverMultiplePeriods() { + simulateACleanerRun(); + assertMetrics(4, 4, 1, 1); + currTime += 1300; + simulateACleanerRun(); + assertMetrics(4, 8, 1, 2); + } + + public void simulateACleanerRun() { + long startTime = currTime; + cleanerMetrics.reportCleaningStart(currTime); + currTime += 9; + cleanerMetrics.reportAFileProcess(currTime); + cleanerMetrics.reportAFileDelete(currTime); + currTime++; + cleanerMetrics.reportAFileProcess(currTime); + cleanerMetrics.reportAFileProcess(currTime); + lastDuration = currTime - startTime; + totalDurations += lastDuration; + } + + void assertMetrics(int proc, int totalProc, int del, int totalDel) { + assertEquals( + "Processed files in the last period are not measured correctly", proc, + cleanerMetrics.getProcessedFiles()); + assertEquals("Total processed files are not measured correctly", + totalProc, cleanerMetrics.getTotalProcessedFiles()); + assertEquals( + "Deleted files in the last period are not measured correctly", del, + cleanerMetrics.getDeletedFiles()); + assertEquals("Total deleted files are not measured correctly", + totalDel, cleanerMetrics.getTotalDeletedFiles()); + + assertEquals( + "Rate of processed files in the last period are not measured correctly", + CleanerMetrics.ratePerMsToHour(proc / lastDuration), + cleanerMetrics.getProcessRate()); + assertEquals( + "Rate of total processed files are not measured correctly", + CleanerMetrics.ratePerMsToHour(totalProc / totalDurations), + cleanerMetrics.getTotalProcessRate()); + assertEquals( + "Rate of deleted files in the last period are not measured correctly", + CleanerMetrics.ratePerMsToHour(del / lastDuration), + cleanerMetrics.getDeleteRate()); + assertEquals( + "Rate of total deleted files are not measured correctly", + CleanerMetrics.ratePerMsToHour(totalDel / totalDurations), + cleanerMetrics.getTotalDeleteRate()); + } +}