diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0edf70a..eb30cf4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1261,6 +1261,13 @@
"nested.level";
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
+ // Shared Cache Manager Configs
+ public static final String SCM_PREFIX = SHARED_CACHE_PREFIX + "manager.";
+
+ public static final String SCM_STORE_IMPL = SCM_PREFIX + "store.impl";
+ public static final String DEFAULT_SCM_STORE_IMPL =
+ "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index baa533b..1ea1e7d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1358,6 +1358,12 @@
yarn.sharedcache.nested.level
3
+
+
+ The implementation to be used for the SCM store
+ yarn.sharedcache.manager.store.impl
+ org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 6a85129..f653adf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
import com.google.common.annotations.VisibleForTesting;
@@ -58,6 +59,7 @@
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
private Configuration conf;
+ private SCMStore store;
public SharedCacheManager() {
super("SharedCacheManager");
@@ -72,6 +74,9 @@ protected void serviceInit(Configuration conf) throws Exception {
try {
SCMContext context = createSCMContext(appChecker, conf);
+ this.store = createSCMStoreService(conf, context);
+ addService(store);
+
} catch (IOException e) {
LOG.error("Encountered unexpected exception while initializing the shared cache manager",
e);
@@ -165,6 +170,22 @@ SCMContext createSCMContext(AppChecker appChecker, Configuration conf)
return initialCachedEntries;
}
+ private static SCMStore createSCMStoreService(Configuration conf,
+ SCMContext context) {
+ String className =
+ conf.get(YarnConfiguration.SCM_STORE_IMPL,
+ YarnConfiguration.DEFAULT_SCM_STORE_IMPL);
+ SCMStore store = null;
+ try {
+ Class> clazz = Class.forName(className);
+ Constructor> cstr = clazz.getConstructor(SCMContext.class);
+ store = (SCMStore) cstr.newInstance(context);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ return store;
+ }
+
@Override
protected void serviceStart() throws Exception {
// Start metrics
@@ -181,6 +202,14 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
+ /**
+ * For testing purposes only.
+ */
+ @VisibleForTesting
+ SCMStore getSCMStore() {
+ return this.store;
+ }
+
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java
new file mode 100644
index 0000000..f527bbc
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Class that encapsulates the cache entry.
+ *
+ * The instances are not thread safe. Any operation that uses the entry must
+ * use thread-safe mechanisms to ensure safe access with the only exception of
+ * the filename.
+ */
+class Entry {
+ private long accessTime;
+ private final Set refs;
+ private final String fileName;
+
+ Entry(String fileName) {
+ this.accessTime = System.currentTimeMillis();
+ this.refs = new HashSet();
+ this.fileName = fileName;
+ }
+
+ long getAccessTime() {
+ return accessTime;
+ }
+
+ void updateAccessTime() {
+ accessTime = System.currentTimeMillis();
+ }
+
+ String getFileName() {
+ return this.fileName;
+ }
+
+ Set getResourceReferences() {
+ return this.refs;
+ }
+
+ boolean addReference(ResourceReference ref) {
+ return this.refs.add(ref);
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
new file mode 100644
index 0000000..ec64e1b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext;
+
+/**
+ * A thread safe version of an in-memory SCM store. The thread safety is
+ * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
+ * is used to allow concurrency to entries and their mapping, and (2) a key
+ * level lock is used to ensure mutual exclusion of any operation that accesses
+ * the entry under the same key.
+ *
+ * To ensure safe key-level locking, we use the original string key and intern
+ * it weakly using hadoop's StringInterner. It avoids the pitfalls
+ * of using built-in String interning. The interned strings are also weakly
+ * referenced, so it can be garbage collected once it is done. And there is
+ * little risk of keys being available for other parts of the code so they can
+ * be used as locks accidentally.
+ */
+public class InMemorySCMStore extends SCMStore {
+ private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
+
+ private final Map map = new ConcurrentHashMap();
+
+ public InMemorySCMStore(SCMContext context) {
+ super(InMemorySCMStore.class.getName(), context);
+ // bootstrap itself
+ bootstrap(context);
+ }
+
+ private String intern(String key) {
+ return StringInterner.weakIntern(key);
+ }
+
+ /**
+ * The in-memory store bootstraps itself from the shared cache entries that
+ * exist in HDFS. If that information is passed to it via
+ * SCMContext, it starts with that state.
+ */
+ private void bootstrap(SCMContext context) {
+ Map initialCachedEntries = context.getInitialCachedEntries();
+ if (initialCachedEntries != null) {
+ LOG.info("Bootstrapping from " + initialCachedEntries.size() +
+ " entries from the storage");
+ Iterator> it =
+ initialCachedEntries.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry e = it.next();
+ String key = intern(e.getKey());
+ String fileName = e.getValue();
+ Entry entry = new Entry(fileName);
+ // we don't hold the lock for this as it is done as part of the
+ // constructor
+ map.put(key, entry);
+ // clear out the entry to reduce the footprint
+ it.remove();
+ }
+ LOG.info("Bootstrapping complete");
+ }
+ }
+
+ /**
+ * Adds the given resource to the store under the key and the filename. If the
+ * entry is already found, it returns the existing filename. It represents the
+ * state of the store at the time of this query. The entry may change or even
+ * be removed once this method returns. The caller should be prepared to
+ * handle that situation.
+ *
+ * @return the filename of the newly inserted entry or that of the existing
+ * entry
+ */
+ @Override
+ public String addKey(String key, String fileName) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ if (entry == null) {
+ entry = new Entry(fileName);
+ map.put(interned, entry);
+ }
+ return entry.getFileName();
+ }
+ }
+
+ /**
+ * Adds the provided resource reference to the cache entry under the key, and
+ * updates the access time. If it returns a non-null value, the caller may
+ * safely assume that the entry will not be removed at least until the app in
+ * this resource reference has terminated.
+ *
+ * @return the filename associated with the cache entry, or null if the file
+ * is not found
+ */
+ @Override
+ public String addResourceReference(String key, ApplicationId id,
+ String shortUserName) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ if (entry == null) { // it's not mapped
+ return null;
+ }
+ entry.addReference(new ResourceReference(id, shortUserName));
+ entry.updateAccessTime();
+ return entry.getFileName();
+ }
+ }
+
+ /**
+ * Returns the list of resource references currently registered under the
+ * cache entry. If the list is empty, it returns an empty collection. The
+ * returned collection is unmodifiable and a snapshot of the information at
+ * the time of the query. The state may change after this query returns. The
+ * caller should handle the situation that some or all of these resource
+ * references are no longer relevant.
+ *
+ * @return the collection that contains the resource references associated
+ * with the cache entry; or an empty collection if no resource
+ * references are registered under this entry
+ */
+ @Override
+ public Collection getResourceReferences(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ if (entry == null) {
+ return Collections.emptySet();
+ }
+ Set refs =
+ new HashSet(entry.getResourceReferences());
+ return Collections.unmodifiableSet(refs);
+ }
+ }
+
+ /**
+ * Removes the provided resource reference from the entry. If the entry is
+ * removed (i.e. does not exist), nothing will be done.
+ */
+ @Override
+ public boolean removeResourceReference(String key, ResourceReference ref,
+ boolean updateAccessTime) {
+ String interned = intern(key);
+ synchronized (interned) {
+ boolean removed = false;
+ Entry entry = map.get(interned);
+ if (entry != null) {
+ Set entryRefs = entry.getResourceReferences();
+ removed = entryRefs.remove(ref);
+ if (updateAccessTime) {
+ entry.updateAccessTime();
+ }
+ }
+ return removed;
+ }
+ }
+
+ /**
+ * Removes the provided collection of resource references from the entry. If
+ * the entry is removed (i.e. does not exist), nothing will be done.
+ */
+ @Override
+ public void removeResourceRefs(String key,
+ Collection refs, boolean updateAccessTime) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ if (entry != null) {
+ Set entryRefs = entry.getResourceReferences();
+ entryRefs.removeAll(refs);
+ if (updateAccessTime) {
+ entry.updateAccessTime();
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes the given key from the store. Returns true if the key is found and
+ * removed or if the key is not found. Returns false if it was unable to
+ * remove it because the resource reference list was not empty.
+ */
+ public boolean removeKey(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ if (entry == null) {
+ return true;
+ }
+
+ if (!entry.getResourceReferences().isEmpty()) {
+ return false;
+ }
+ // no users
+ map.remove(interned);
+ return true;
+ }
+ }
+
+ /**
+ * Obtains the access time for the entry of the given key. It represents the
+ * view of the entry at the time of the query. The value may have been updated
+ * at a later point.
+ *
+ * @return the access time of the entry if found; -1 if the entry is not found
+ */
+ @Override
+ public long getAccessTime(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ Entry entry = map.get(interned);
+ return entry == null ? -1 : entry.getAccessTime();
+ }
+ }
+
+ /**
+ * This is to be used only for a test purpose. It must not be used in a real
+ * production situation.
+ */
+ @Override
+ public void clearCache() {
+ // TODO remove this method and rewrite the tests
+ // just clear it
+ map.clear();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java
new file mode 100644
index 0000000..c585fce
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * This is an object that represents the meta data associated with a reference
+ * to a resource in the shared cache.
+ */
+public class ResourceReference {
+ private final ApplicationId appId;
+ private final String shortUserName;
+
+ /**
+ * Create a resource reference.
+ *
+ * @param appId ApplicationId that is referencing a resource.
+ * @param shortUserName ShortUserName of the user that created
+ * the reference.
+ */
+ public ResourceReference(ApplicationId appId, String shortUserName) {
+ this.appId = appId;
+ this.shortUserName = shortUserName;
+ }
+
+ public ApplicationId getAppId() {
+ return this.appId;
+ }
+
+ public String getShortUserName() {
+ return this.shortUserName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((appId == null) ? 0 : appId.hashCode());
+ result =
+ prime * result
+ + ((shortUserName == null) ? 0 : shortUserName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ResourceReference other = (ResourceReference) obj;
+ if (appId == null) {
+ if (other.appId != null)
+ return false;
+ } else if (!appId.equals(other.appId))
+ return false;
+ if (shortUserName == null) {
+ if (other.shortUserName != null)
+ return false;
+ } else if (!shortUserName.equals(other.shortUserName))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
new file mode 100644
index 0000000..c8995ea
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext;
+
+
+/**
+ * An abstract class for the data store used by the shared cache manager
+ * service. All implementations of methods in this interface need to be thread
+ * safe and atomic.
+ *
+ * All implementations of SCMStore must provide a constructor that
+ * takes SCMContext as a sole argument.
+ */
+@Private
+@Unstable
+public abstract class SCMStore extends AbstractService {
+ private final SCMContext context;
+
+ protected SCMStore(String name, SCMContext context) {
+ super(name);
+ this.context = context;
+ }
+
+ /**
+ * Add a key to the shared cache, along with the associated filename. If the
+ * key already exists no action is taken. If the key did not exist, the key is
+ * added and the access time is set.
+ *
+ * @param key checksum of the cache entry
+ * @param fileName string of the actual file
+ * @return the file name of the file in the cache entry
+ */
+ public abstract String addKey(String key, String fileName);
+
+
+ /**
+ * Remove an entry from the cache.
+ *
+ * @param key checksum of the cache entry
+ * @return true if the entry was removed or did not exist, false if the entry
+ * contained ApplicationIds and was not removed
+ */
+ public abstract boolean removeKey(String key);
+
+ /**
+ * Add a ResourceReference to a cache entry and update the
+ * entry's access time.
+ *
+ * @param key checksum of the cache entry
+ * @param id ApplicationId to add
+ * @param shortUsername the short username of the user responsible for adding
+ * the resource reference
+ * @return String name of the file corresponding to the cache entry if the id
+ * was added or already existed, null if the cache entry did not exist
+ */
+ public abstract String addResourceReference(String key, ApplicationId id,
+ String shortUsername);
+
+ /**
+ * Get all the ApplicationIds associated with the cache entry
+ *
+ * @param key checksum of the cache entry
+ * @return an unmodifiable collection of ResourceReferences. If
+ * the key does not exist, an empty set is returned.
+ */
+ public abstract Collection getResourceReferences(String key);
+
+ /**
+ * Get the access time for a cache entry.
+ *
+ * @param key checksum of the cache entry
+ * @return the access time in milliseconds of the cache entry, -1 if the entry
+ * does not exist
+ */
+ public abstract long getAccessTime(String key);
+
+ /**
+ * Remove a ResourceReference from a cache entry.
+ *
+ * @param key checksum of the cache entry
+ * @param ref the ResourceReference to remove
+ * @param updateAccessTime true if the call should update the access time for
+ * the entry
+ * @return true if the entry was removed, false otherwise
+ */
+ public abstract boolean removeResourceReference(String key,
+ ResourceReference ref, boolean updateAccessTime);
+
+ /**
+ * Remove a collection of ResourceReferences from a cache entry.
+ *
+ * @param key checksum of the cache entry
+ * @param refs the collection of ResourceReferences to remove
+ * @param updateAccessTime true if the call should update the access time for
+ * the entry
+ */
+ public abstract void removeResourceRefs(String key,
+ Collection refs, boolean updateAccessTime);
+
+ /**
+ * Permanently removes all entries from the cache.
+ *
+ * This method exists solely to support some tests, and must not be used in a
+ * production situation.
+ */
+ public abstract void clearCache();
+
+ /**
+ * Returns the SCMContext.
+ *
+ * @return SCMContext
+ */
+ protected SCMContext getSCMContext() {
+ return context;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
new file mode 100644
index 0000000..c3c8a7c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext;
+import org.junit.Test;
+
+public class TestInMemorySCMStore {
+ @Test
+ public void testAddKeyConcurrency() throws Exception {
+ final SCMStore store = new InMemorySCMStore(new SCMContext());
+ final String key = "key1";
+ int count = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(count);
+ List> futures = new ArrayList>(count);
+ final CountDownLatch start = new CountDownLatch(1);
+ for (int i = 0; i < count; i++) {
+ final String fileName = "foo-" + i + ".jar";
+ Callable task = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ String result = store.addKey(key, fileName);
+ System.out.println("fileName: " + fileName + ", result: " + result);
+ return result;
+ }
+ };
+ futures.add(exec.submit(task));
+ }
+ // start them all at the same time
+ start.countDown();
+ // check the result; they should all agree with the value
+ Set results = new HashSet();
+ for (Future future: futures) {
+ results.add(future.get());
+ }
+ assertSame(1, results.size());
+ exec.shutdown();
+ store.close();
+ }
+
+ @Test
+ public void testAddAppIdNonExistentKey() throws Exception {
+ SCMStore store = new InMemorySCMStore(new SCMContext());
+ String key = "key1";
+ ApplicationId id = createAppId(1, 1L);
+ // try adding an app id without adding the key first
+ assertNull(store.addResourceReference(key, id, "user"));
+ store.close();
+ }
+
+ @Test
+ public void testRemoveKeyEmptyAppIds() throws Exception {
+ SCMStore store = new InMemorySCMStore(new SCMContext());
+ String key = "key1";
+ String fileName = "foo.jar";
+ // first add key
+ store.addKey(key, fileName);
+ // try removing the key; it should return true
+ assertTrue(store.removeKey(key));
+ store.close();
+ }
+
+ @Test
+ public void testAddAppIdRemoveKey() throws Exception {
+ SCMStore store = new InMemorySCMStore(new SCMContext());
+ String key = "key1";
+ ApplicationId id = createAppId(1, 1L);
+ String user = "user";
+ // add the key, and then add an app id
+ store.addKey(key, "foo.jar");
+ store.addResourceReference(key, id, user);
+ // removeKey should return false
+ assertTrue(!store.removeKey(key));
+ // the entry and the app id should be intact
+ Collection refs = store.getResourceReferences(key);
+ assertTrue(refs != null);
+ assertEquals(Collections.singleton(new ResourceReference(id, user)), refs);
+ store.close();
+ }
+
+ @Test
+ public void testAddAppIdConcurrency() throws Exception {
+ final SCMStore store = new InMemorySCMStore(new SCMContext());
+ final String key = "key1";
+ final String user = "user";
+ String fileName = "foo.jar";
+
+ // first add the key
+ store.addKey(key, fileName);
+
+ // make concurrent addAppId calls (clients)
+ int count = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(count);
+ List> futures = new ArrayList>(count);
+ final CountDownLatch start = new CountDownLatch(1);
+ for (int i = 0; i < count; i++) {
+ final ApplicationId id = createAppId(i, i);
+ Callable task = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addResourceReference(key, id, user);
+ }
+ };
+ futures.add(exec.submit(task));
+ }
+ // start them all at the same time
+ start.countDown();
+ // check the result
+ Set results = new HashSet();
+ for (Future future: futures) {
+ results.add(future.get());
+ }
+ // they should all have the same file name
+ assertSame(1, results.size());
+ assertEquals(Collections.singleton(fileName), results);
+ // there should be 5 app ids as a result
+ Collection refs = store.getResourceReferences(key);
+ assertSame(count, refs.size());
+ exec.shutdown();
+ store.close();
+ }
+
+ @Test
+ public void testAddAppIdAddKeyConcurrency() throws Exception {
+ final SCMStore store = new InMemorySCMStore(new SCMContext());
+ final String key = "key1";
+ final String fileName = "foo.jar";
+ final String user = "user";
+ final ApplicationId id = createAppId(1, 1L);
+ // add the key and add the id at the same time
+ ExecutorService exec = Executors.newFixedThreadPool(2);
+ final CountDownLatch start = new CountDownLatch(1);
+ Callable addKeyTask = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addKey(key, fileName);
+ }
+ };
+ Callable addAppIdTask = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addResourceReference(key, id, user);
+ }
+ };
+ Future addAppIdFuture = exec.submit(addAppIdTask);
+ Future addKeyFuture = exec.submit(addKeyTask);
+ // start them at the same time
+ start.countDown();
+ // get the results
+ String addKeyResult = addKeyFuture.get();
+ String addAppIdResult = addAppIdFuture.get();
+ assertEquals(fileName, addKeyResult);
+ System.out.println("addAppId() result: " + addAppIdResult);
+ // it may be null or the fileName depending on the timing
+ assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
+ exec.shutdown();
+ store.close();
+ }
+
+ @Test
+ public void testRemoveAppIds() throws Exception {
+ SCMStore store = new InMemorySCMStore(new SCMContext());
+ String key = "key1";
+ String fileName = "foo.jar";
+ String user = "user";
+ // first add the key
+ store.addKey(key, fileName);
+ // add an app id
+ ApplicationId id = createAppId(1, 1L);
+ ResourceReference myRef = new ResourceReference(id, user);
+ String result = store.addResourceReference(key, id, user);
+ assertEquals(fileName, result);
+ Collection refs = store.getResourceReferences(key);
+ assertSame(1, refs.size());
+ assertEquals(Collections.singleton(myRef), refs);
+ // remove the same key
+ store.removeResourceRefs(key, Collections.singleton(myRef), true);
+ Collection newRefs = store.getResourceReferences(key);
+ assertTrue(newRefs == null || newRefs.isEmpty());
+ store.close();
+ }
+
+ @Test
+ public void testRemoveAppId() throws Exception {
+ SCMStore store = new InMemorySCMStore(new SCMContext());
+ String key = "key1";
+ String fileName = "foo.jar";
+ String user = "user";
+ // first add the key
+ store.addKey(key, fileName);
+ // add an app id
+ ApplicationId id = createAppId(1, 1L);
+ ResourceReference myRef = new ResourceReference(id, user);
+ String result = store.addResourceReference(key, id, user);
+ assertEquals(fileName, result);
+ Collection refs = store.getResourceReferences(key);
+ assertSame(1, refs.size());
+ assertEquals(Collections.singleton(myRef), refs);
+ // remove the same key
+ store.removeResourceReference(key, myRef, true);
+ Collection newRefs = store.getResourceReferences(key);
+ assertTrue(newRefs == null || newRefs.isEmpty());
+ store.close();
+ }
+
+ @Test
+ public void testBootstrapping() throws Exception {
+ Map initialCachedEntries = new HashMap();
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ String key = String.valueOf(i);
+ String fileName = key + ".jar";
+ initialCachedEntries.put(key, fileName);
+ }
+
+ SCMContext context = new SCMContext(initialCachedEntries, null);
+ SCMStore store = new InMemorySCMStore(context);
+ ApplicationId id = createAppId(1, 1L);
+ // the entries from the cached entries should now exist
+ for (int i = 0; i < count; i++) {
+ String key = String.valueOf(i);
+ String fileName = key + ".jar";
+ String result = store.addResourceReference(key, id, "user");
+ // the value should not be null (i.e. it has the key) and the filename should match
+ assertEquals(fileName, result);
+ // the initial input should be emptied
+ assertTrue(initialCachedEntries.isEmpty());
+ }
+ store.close();
+ }
+
+ private ApplicationId createAppId(int id, long timestamp) {
+ return ApplicationId.newInstance(timestamp, id);
+ }
+}