diff --git hadoop-yarn-project/hadoop-yarn/bin/yarn hadoop-yarn-project/hadoop-yarn/bin/yarn
index 2017d57..128ab91 100644
--- hadoop-yarn-project/hadoop-yarn/bin/yarn
+++ hadoop-yarn-project/hadoop-yarn/bin/yarn
@@ -33,6 +33,7 @@ function hadoop_usage
echo " resourcemanager run the ResourceManager"
echo " resourcemanager -format-state-store deletes the RMStateStore"
echo " rmadmin admin tools"
+ echo " sharedcachemanager run the SharedCacheManager daemon"
echo " timelineserver run the timeline server"
echo " version print the version"
echo " or"
@@ -138,6 +139,11 @@ case "${COMMAND}" in
JAVA_HEAP_MAX="-Xmx${YARN_TIMELINESERVER_HEAPSIZE}m"
fi
;;
+ sharedcachemanager)
+ daemon="true"
+ CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
+ YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
+ ;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d227e4f..38da16b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1230,6 +1230,28 @@
public static final String TIMELINE_SERVICE_KEYTAB =
TIMELINE_SERVICE_PREFIX + "keytab";
+ // ///////////////////////////////
+ // Shared Cache Configs
+ // ///////////////////////////////
+ public static final String SHARED_CACHE_PREFIX = "yarn.sharedcache.";
+
+ // common configs
+ /** whether the shared cache is enabled/disabled */
+ public static final String SHARED_CACHE_ENABLED = SHARED_CACHE_PREFIX +
+ "enabled";
+ public static final boolean DEFAULT_SHARED_CACHE_ENABLED = false;
+
+ /** The config key for the shared cache root directory. */
+ public static final String SHARED_CACHE_ROOT = SHARED_CACHE_PREFIX +
+ "root";
+ public static final String DEFAULT_SHARED_CACHE_ROOT = "/sharedcache";
+
+ /** The config key for the level of nested directories before getting to the
+ * checksum directory. */
+ public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX +
+ "nested.level";
+ public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 55b3490..bc6e698 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1325,6 +1325,26 @@
org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
+
+
+ Whether the shared cache is enabled
+ yarn.sharedcache.enabled
+ false
+
+
+
+ The root directory for the shared cache
+ yarn.sharedcache.root
+ /sharedcache
+
+
+
+ The level of nested directories before getting to the checksum
+ directories. It must be non-negative.
+ yarn.sharedcache.nested.level
+ 3
+
+
The interval that the yarn client library uses to poll the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java
new file mode 100644
index 0000000..cd620c3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A utility class that contains helper methods for dealing with the internal
+ * shared cache structure.
+ *
+ */
+public class CacheStructureUtil {
+
+ private static final Log LOG = LogFactory.getLog(CacheStructureUtil.class);
+
+ public static int getCacheDepth(Configuration conf) {
+ int cacheDepth =
+ conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL);
+
+ if (cacheDepth <= 0) {
+ LOG.warn("Specified cache depth was less than or equal to zero."
+ + " Using default value instead. Default: "
+ + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL
+ + ", Specified: " + cacheDepth);
+ cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+ }
+
+ return cacheDepth;
+ }
+
+ public static String getCacheEntryPath(int cacheDepth, String cacheRoot,
+ String checksum) {
+
+ if (cacheDepth <= 0) {
+ throw new IllegalArgumentException(
+ "The cache depth must be greater than 0. Passed value: " + cacheDepth);
+ }
+ if (checksum.length() < cacheDepth) {
+ throw new IllegalArgumentException("The checksum passed was too short: "
+ + checksum);
+ }
+
+ // Build the cache entry path to the specified depth. For example, if the
+ // depth is 3 and the checksum is 3c4f, the path would be:
+ // SHARED_CACHE_ROOT/3/c/4/3c4f
+ StringBuilder sb = new StringBuilder(cacheRoot);
+ for (int i = 0; i < cacheDepth; i++) {
+ sb.append(Path.SEPARATOR_CHAR);
+ sb.append(checksum.charAt(i));
+ }
+ sb.append(Path.SEPARATOR_CHAR).append(checksum);
+
+ return sb.toString();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
new file mode 100644
index 0000000..0969274
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
@@ -0,0 +1,96 @@
+
+
+
+ 4.0.0
+
+ hadoop-yarn-server
+ org.apache.hadoop
+ 3.0.0-SNAPSHOT
+
+ org.apache.hadoop
+ hadoop-yarn-server-sharedcachemanager
+ 3.0.0-SNAPSHOT
+ hadoop-yarn-server-sharedcachemanager
+
+
+
+ ${project.parent.parent.basedir}
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+ org.apache.hadoop
+ hadoop-common
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+ test
+ test-jar
+
+
+
+
+
+
+
+
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java
new file mode 100644
index 0000000..f0f67ee
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * An interface for checking whether an app is running so that the cleaner
+ * service may determine if it can safely remove a cached entry.
+ */
+public interface AppChecker {
+ /**
+ * Returns whether the app is in the active state.
+ *
+ * @return true if the app is found and is not in one of the completed states;
+ * false otherwise
+ * @throws IOException if there is an error in determining the app state
+ */
+ boolean appIsActive(ApplicationId id) throws IOException;
+
+ /**
+ * Returns the list of all active apps at the given time.
+ *
+ * @return the list of active apps, or an empty list if there is none
+ * @throws IOException if there is an error in obtaining the list
+ */
+ Collection getAllActiveApps() throws IOException;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java
new file mode 100644
index 0000000..937fcc7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+/**
+ * An implementation of AppChecker that queries the resource manager remotely to
+ * determine whether the app is running.
+ */
+class RemoteAppChecker implements AppChecker {
+ private static final Log LOG = LogFactory.getLog(RemoteAppChecker.class);
+ private static final EnumSet ACTIVE_STATES =
+ EnumSet.complementOf(EnumSet.of(YarnApplicationState.FINISHED,
+ YarnApplicationState.FAILED,
+ YarnApplicationState.KILLED));
+
+ private final ApplicationClientProtocol applicationsManager;
+
+ /**
+ * Creates an instance of RemoteAppChecker based on the configuration.
+ */
+ public static AppChecker create(Configuration conf) {
+ // create the RM proxy based on the configuration
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress rmAddress =
+ conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ ApplicationClientProtocol applicationsManager =
+ (ApplicationClientProtocol)rpc.getProxy(
+ ApplicationClientProtocol.class, rmAddress, conf);
+ LOG.info("Connected to ResourceManager at " + rmAddress);
+ return new RemoteAppChecker(applicationsManager);
+ }
+
+ RemoteAppChecker(ApplicationClientProtocol applicationsManager) {
+ this.applicationsManager = applicationsManager;
+ }
+
+ public boolean appIsActive(ApplicationId id) throws IOException {
+ GetApplicationReportRequest request =
+ GetApplicationReportRequest.newInstance(id);
+
+ try {
+ GetApplicationReportResponse response =
+ applicationsManager.getApplicationReport(request);
+ ApplicationReport report = response.getApplicationReport();
+ if (report == null) {
+ // the app does not exist
+ return false;
+ }
+
+ return ACTIVE_STATES.contains(report.getYarnApplicationState());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public Collection getAllActiveApps() throws IOException {
+ GetApplicationsRequest request =
+ GetApplicationsRequest.newInstance(ACTIVE_STATES);
+
+ try {
+ GetApplicationsResponse response =
+ applicationsManager.getApplications(request);
+ List activeApps = new ArrayList();
+ List apps = response.getApplicationList();
+ for (ApplicationReport app: apps) {
+ activeApps.add(app.getApplicationId());
+ }
+ return activeApps;
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java
new file mode 100644
index 0000000..6c4f1e5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+
+/**
+ * A context object for the shared cache manager.
+ */
+public class SCMContext {
+ private final long startTime;
+ private final Map initialCachedEntries;
+ private final Collection initialActiveApps;
+
+ public SCMContext() {
+ this(null, null);
+ }
+
+ public SCMContext(Map initialCachedEntries,
+ Collection initialActiveApps) {
+ this.startTime = System.currentTimeMillis();
+ this.initialCachedEntries = initialCachedEntries;
+ this.initialActiveApps = initialActiveApps;
+ }
+
+ /**
+ * Returns the start time when the SCM context was created.
+ */
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ /**
+ * Returns the initial cached entries that was set when the SCM context was
+ * created. Note that the map is not thread safe. The user of this map is
+ * expected to provide thread safety or use it in a single-threaded manner.
+ *
+ * The user may remove entries from this map once it is consumed fully.
+ */
+ public Map getInitialCachedEntries() {
+ return initialCachedEntries;
+ }
+
+ /**
+ * Returns the initial active apps that was set when the SCM context was
+ * created. Note that this list is not thread safe. The user of this list is
+ * expected to provide thread safety or use it in a single-threaded manner.
+ *
+ * The user may remove entries from this list as apps are identified as no
+ * longer active.
+ */
+ public Collection getInitialActiveApps() {
+ return initialActiveApps;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
new file mode 100644
index 0000000..a3d525e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This service maintains the shared cache meta data. It handles claiming and
+ * releasing of resources, all rpc calls from the client to the shared cache
+ * manager, and administrative commands. It also persists the shared cache meta
+ * data to a backend store, and cleans up stale entries on a regular basis.
+ */
+public class SharedCacheManager extends CompositeService {
+ /**
+ * Priority of the SharedCacheManager shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
+
+ private Configuration conf;
+
+ public SharedCacheManager() {
+ super("SharedCacheManager");
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+
+ AppChecker appChecker = RemoteAppChecker.create(conf);
+
+ try {
+ SCMContext context = createSCMContext(appChecker, conf);
+
+ } catch (IOException e) {
+ LOG.error("Encountered unexpected exception while initializing the shared cache manager",
+ e);
+ throw new YarnRuntimeException(e);
+ }
+
+ super.serviceInit(conf);
+ }
+
+ @VisibleForTesting
+ SCMContext createSCMContext(AppChecker appChecker, Configuration conf)
+ throws IOException {
+ // obtain the list of active apps as part of the context
+ LOG.info("Getting the active app list to initialize the SCM context");
+ Collection activeAppList = getActiveAppList(appChecker);
+ LOG.info(activeAppList.size() + " apps recorded as active at this time");
+
+ // obtain the list of cached entries from the filesystem
+ LOG.info("Getting the list of all cached entries from the filesystem");
+ FileSystem fs = FileSystem.get(conf);
+ Map initialCachedEntries = getInitialCachedEntries(fs, conf);
+ LOG.info(initialCachedEntries.size() +
+ " entries obtained from the filesystem");
+
+ SCMContext context = new SCMContext(initialCachedEntries, activeAppList);
+ return context;
+ }
+
+ @VisibleForTesting
+ Collection getActiveAppList(AppChecker appChecker)
+ throws IOException {
+ return appChecker.getAllActiveApps();
+ }
+
+ @VisibleForTesting
+ Map getInitialCachedEntries(FileSystem fs, Configuration conf)
+ throws IOException {
+ // get the root directory for the shared cache
+ String location =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ Path root = new Path(location);
+ if (!fs.exists(root)) {
+ String message = "The shared cache root " + location + " was not found";
+ LOG.error(message);
+ throw new IOException(message);
+ }
+
+ int nestedLevel = CacheStructureUtil.getCacheDepth(conf);
+ // now traverse individual directories and process them
+ // the directory structure is specified by the nested level parameter
+ // (e.g. 9/c/d//file)
+ StringBuilder pattern = new StringBuilder();
+ for (int i = 0; i < nestedLevel+1; i++) {
+ pattern.append("*/");
+ }
+ pattern.append("*");
+
+ LOG.info("Querying for all individual cached entry files");
+ FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+ int numEntries = entries == null ? 0 : entries.length;
+ LOG.info("Found " + numEntries + " files: processing for one entity per "
+ + "checksum");
+
+ Map initialCachedEntries = new HashMap();
+ for (FileStatus entry: entries) {
+ Path file = entry.getPath();
+ String fileName = file.getName();
+ if (entry.isFile()) {
+ // get the parent to get the checksum
+ Path parent = file.getParent();
+ if (parent != null) {
+ // the name of the immediate parent directory is the checksum
+ String key = parent.getName();
+ // make sure we insert only one file per checksum whichever comes
+ // first
+ String mapped = initialCachedEntries.get(key);
+ if (mapped != null) {
+ LOG.warn("Key " + key + " is already mapped to file " + mapped
+ + "; file " + fileName + " will not be added");
+ } else {
+ initialCachedEntries.put(key, fileName);
+ }
+ }
+ }
+ }
+ LOG.info("A total of " + initialCachedEntries.size() +
+ " files are now mapped");
+ return initialCachedEntries;
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ // Start metrics
+ DefaultMetricsSystem.initialize("SharedCacheManager");
+ JvmMetrics.initSingleton("SharedCacheManager", null);
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+
+ DefaultMetricsSystem.shutdown();
+ super.serviceStop();
+ }
+
+ public static void main(String[] args) {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
+ try {
+ Configuration conf = new YarnConfiguration();
+ SharedCacheManager sharedCacheManager = new SharedCacheManager();
+ ShutdownHookManager.get().addShutdownHook(
+ new CompositeServiceShutdownHook(sharedCacheManager),
+ SHUTDOWN_HOOK_PRIORITY);
+ sharedCacheManager.init(conf);
+ sharedCacheManager.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting SharedCacheManager", t);
+ System.exit(-1);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java
new file mode 100644
index 0000000..6768d9d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.junit.Test;
+
+public class TestRemoteAppChecker {
+
+ @Test
+ public void testNonExistentApp() throws Exception {
+ ApplicationClientProtocol applicationsManager = mock(ApplicationClientProtocol.class);
+ GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
+ // mock the response so that it returns a null application report
+ when(response.getApplicationReport()).thenReturn(null);
+ when(applicationsManager.getApplicationReport(isA(GetApplicationReportRequest.class))).
+ thenReturn(response);
+
+ AppChecker appChecker = new RemoteAppChecker(applicationsManager);
+ assertFalse(appChecker.appIsActive(isA(ApplicationId.class)));
+ }
+
+ @Test
+ public void testRunningApp() throws Exception {
+ ApplicationClientProtocol applicationsManager = mock(ApplicationClientProtocol.class);
+ GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.ACCEPTED);
+ // mock the response so that it returns an application report in the
+ // accepted state
+ when(response.getApplicationReport()).thenReturn(report);
+ when(applicationsManager.getApplicationReport(isA(GetApplicationReportRequest.class))).
+ thenReturn(response);
+
+ AppChecker appChecker = new RemoteAppChecker(applicationsManager);
+ assertTrue(appChecker.appIsActive(isA(ApplicationId.class)));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java
new file mode 100644
index 0000000..4be80d5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSharedCacheManager {
+ private static final Configuration conf = new YarnConfiguration();
+
+ private static final FileSystem fs;
+
+ private static final String KEY1 = "abcdefgh";
+ private static final String FILE1 = "foo";
+ private static final String KEY2 = "ijklmnop";
+ private static final String FILE2 = "bar";
+
+ private static final Path baseDir;
+ private static final String root;
+ private static final String path1;
+ private static final String path2;
+
+ static {
+ try {
+ fs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ baseDir =
+ new Path("target", TestSharedCacheManager.class.getSimpleName())
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ root = baseDir.toUri().getPath();
+ // set the shared cache root
+ conf.set(YarnConfiguration.SHARED_CACHE_ROOT, root);
+
+ path1 = getFullPath(KEY1, FILE1);
+ path2 = getFullPath(KEY2, FILE2);
+ }
+
+ private static String getFullPath(String key, String fileName) {
+ int cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+ return CacheStructureUtil.getCacheEntryPath(cacheDepth, root, key) +
+ Path.SEPARATOR + fileName;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ FileContext files = FileContext.getLocalFSFileContext();
+ files.mkdir(baseDir, null, true);
+ // add a couple of directories and files
+ createFile(new Path(path1), files);
+ createFile(new Path(path2), files);
+ }
+
+ @After
+ public void shutDown() throws IOException {
+ FileContext files = FileContext.getLocalFSFileContext();
+ files.delete(baseDir, true);
+ }
+
+ private void createFile(Path path, FileContext files) throws IOException {
+ Path parent = path.getParent();
+ files.mkdir(parent, null, true);
+ FSDataOutputStream out = null;
+ try {
+ out = files.create(path, EnumSet.of(CREATE, OVERWRITE));
+ out.writeUTF("This is a test");
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ @Test
+ public void testCreateSCMContext() throws Exception {
+ SharedCacheManager scm = new SharedCacheManager();
+
+ AppChecker appChecker = mock(AppChecker.class);
+ List ids = new ArrayList();
+ ids.add(createAppId(1, 1L));
+ ids.add(createAppId(2, 1L));
+ ids.add(createAppId(3, 1L));
+ when(appChecker.getAllActiveApps()).thenReturn(ids);
+
+ SCMContext context = scm.createSCMContext(appChecker, conf);
+ Map initialCachedEntries = context.getInitialCachedEntries();
+ // verify the initial cached entries
+ assertSame(2, initialCachedEntries.size());
+ assertEquals(FILE1, initialCachedEntries.get(KEY1));
+ assertEquals(FILE2, initialCachedEntries.get(KEY2));
+
+ // verify the active apps
+ assertSame(ids, context.getInitialActiveApps());
+ }
+
+ private ApplicationId createAppId(int id, long timestamp) {
+ return ApplicationId.newInstance(timestamp, id);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index b635d10..886773a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -39,6 +39,7 @@
hadoop-yarn-server-nodemanager
hadoop-yarn-server-web-proxy
hadoop-yarn-server-resourcemanager
+ hadoop-yarn-server-sharedcachemanager
hadoop-yarn-server-tests
hadoop-yarn-server-applicationhistoryservice