diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 241e0b7..24eed89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1225,6 +1225,13 @@ "nested.level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; + // Shared Cache Manager Configs + public static final String SCM_PREFIX = SHARED_CACHE_PREFIX + "manager."; + + public static final String SCM_STORE_IMPL = SCM_PREFIX + "store.impl"; + public static final String DEFAULT_SCM_STORE_IMPL = + "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9916263..70b405d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1280,6 +1280,12 @@ yarn.sharedcache.nested.level 3 + + + The implementation to be used for the SCM store + yarn.sharedcache.manager.store.impl + org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index df94550..67bbf4a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; import com.google.common.annotations.VisibleForTesting; @@ -58,6 +59,7 @@ private static final Log LOG = LogFactory.getLog(SharedCacheManager.class); private Configuration conf; + private SCMStore store; public SharedCacheManager() { super("SharedCacheManager"); @@ -72,6 +74,9 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { try { SCMContext context = createSCMContext(appChecker, conf); + this.store = createSCMStoreService(conf, context); + addService(store); + } catch (IOException e) { LOG.error("Encountered unexpected exception while initializing the shared cache manager", e); @@ -163,6 +168,22 @@ SCMContext createSCMContext(AppChecker appChecker, Configuration conf) return initialCachedEntries; } + private static SCMStore createSCMStoreService(Configuration conf, + SCMContext context) { + String className = + conf.get(YarnConfiguration.SCM_STORE_IMPL, + YarnConfiguration.DEFAULT_SCM_STORE_IMPL); + SCMStore store = null; + try { + Class clazz = Class.forName(className); + Constructor cstr = clazz.getConstructor(SCMContext.class); + store = (SCMStore) cstr.newInstance(context); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + return store; + } + @Override protected synchronized void serviceStart() throws Exception { @@ -176,6 +197,14 @@ protected synchronized void serviceStop() throws Exception { super.serviceStop(); } + /** + * For testing purposes only. + */ + @VisibleForTesting + SCMStore getSCMStore() { + return this.store; + } + public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java new file mode 100644 index 0000000..f527bbc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/Entry.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * Class that encapsulates the cache entry. + * + * The instances are not thread safe. Any operation that uses the entry must + * use thread-safe mechanisms to ensure safe access with the only exception of + * the filename. + */ +class Entry { + private long accessTime; + private final Set refs; + private final String fileName; + + Entry(String fileName) { + this.accessTime = System.currentTimeMillis(); + this.refs = new HashSet(); + this.fileName = fileName; + } + + long getAccessTime() { + return accessTime; + } + + void updateAccessTime() { + accessTime = System.currentTimeMillis(); + } + + String getFileName() { + return this.fileName; + } + + Set getResourceReferences() { + return this.refs; + } + + boolean addReference(ResourceReference ref) { + return this.refs.add(ref); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java new file mode 100644 index 0000000..ec64e1b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext; + +/** + * A thread safe version of an in-memory SCM store. The thread safety is + * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap + * is used to allow concurrency to entries and their mapping, and (2) a key + * level lock is used to ensure mutual exclusion of any operation that accesses + * the entry under the same key. + *
+ * To ensure safe key-level locking, we use the original string key and intern + * it weakly using hadoop's StringInterner. It avoids the pitfalls + * of using built-in String interning. The interned strings are also weakly + * referenced, so it can be garbage collected once it is done. And there is + * little risk of keys being available for other parts of the code so they can + * be used as locks accidentally. + */ +public class InMemorySCMStore extends SCMStore { + private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class); + + private final Map map = new ConcurrentHashMap(); + + public InMemorySCMStore(SCMContext context) { + super(InMemorySCMStore.class.getName(), context); + // bootstrap itself + bootstrap(context); + } + + private String intern(String key) { + return StringInterner.weakIntern(key); + } + + /** + * The in-memory store bootstraps itself from the shared cache entries that + * exist in HDFS. If that information is passed to it via + * SCMContext, it starts with that state. + */ + private void bootstrap(SCMContext context) { + Map initialCachedEntries = context.getInitialCachedEntries(); + if (initialCachedEntries != null) { + LOG.info("Bootstrapping from " + initialCachedEntries.size() + + " entries from the storage"); + Iterator> it = + initialCachedEntries.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + String key = intern(e.getKey()); + String fileName = e.getValue(); + Entry entry = new Entry(fileName); + // we don't hold the lock for this as it is done as part of the + // constructor + map.put(key, entry); + // clear out the entry to reduce the footprint + it.remove(); + } + LOG.info("Bootstrapping complete"); + } + } + + /** + * Adds the given resource to the store under the key and the filename. If the + * entry is already found, it returns the existing filename. It represents the + * state of the store at the time of this query. The entry may change or even + * be removed once this method returns. The caller should be prepared to + * handle that situation. + * + * @return the filename of the newly inserted entry or that of the existing + * entry + */ + @Override + public String addKey(String key, String fileName) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + if (entry == null) { + entry = new Entry(fileName); + map.put(interned, entry); + } + return entry.getFileName(); + } + } + + /** + * Adds the provided resource reference to the cache entry under the key, and + * updates the access time. If it returns a non-null value, the caller may + * safely assume that the entry will not be removed at least until the app in + * this resource reference has terminated. + * + * @return the filename associated with the cache entry, or null if the file + * is not found + */ + @Override + public String addResourceReference(String key, ApplicationId id, + String shortUserName) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + if (entry == null) { // it's not mapped + return null; + } + entry.addReference(new ResourceReference(id, shortUserName)); + entry.updateAccessTime(); + return entry.getFileName(); + } + } + + /** + * Returns the list of resource references currently registered under the + * cache entry. If the list is empty, it returns an empty collection. The + * returned collection is unmodifiable and a snapshot of the information at + * the time of the query. The state may change after this query returns. The + * caller should handle the situation that some or all of these resource + * references are no longer relevant. + * + * @return the collection that contains the resource references associated + * with the cache entry; or an empty collection if no resource + * references are registered under this entry + */ + @Override + public Collection getResourceReferences(String key) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + if (entry == null) { + return Collections.emptySet(); + } + Set refs = + new HashSet(entry.getResourceReferences()); + return Collections.unmodifiableSet(refs); + } + } + + /** + * Removes the provided resource reference from the entry. If the entry is + * removed (i.e. does not exist), nothing will be done. + */ + @Override + public boolean removeResourceReference(String key, ResourceReference ref, + boolean updateAccessTime) { + String interned = intern(key); + synchronized (interned) { + boolean removed = false; + Entry entry = map.get(interned); + if (entry != null) { + Set entryRefs = entry.getResourceReferences(); + removed = entryRefs.remove(ref); + if (updateAccessTime) { + entry.updateAccessTime(); + } + } + return removed; + } + } + + /** + * Removes the provided collection of resource references from the entry. If + * the entry is removed (i.e. does not exist), nothing will be done. + */ + @Override + public void removeResourceRefs(String key, + Collection refs, boolean updateAccessTime) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + if (entry != null) { + Set entryRefs = entry.getResourceReferences(); + entryRefs.removeAll(refs); + if (updateAccessTime) { + entry.updateAccessTime(); + } + } + } + } + + /** + * Removes the given key from the store. Returns true if the key is found and + * removed or if the key is not found. Returns false if it was unable to + * remove it because the resource reference list was not empty. + */ + public boolean removeKey(String key) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + if (entry == null) { + return true; + } + + if (!entry.getResourceReferences().isEmpty()) { + return false; + } + // no users + map.remove(interned); + return true; + } + } + + /** + * Obtains the access time for the entry of the given key. It represents the + * view of the entry at the time of the query. The value may have been updated + * at a later point. + * + * @return the access time of the entry if found; -1 if the entry is not found + */ + @Override + public long getAccessTime(String key) { + String interned = intern(key); + synchronized (interned) { + Entry entry = map.get(interned); + return entry == null ? -1 : entry.getAccessTime(); + } + } + + /** + * This is to be used only for a test purpose. It must not be used in a real + * production situation. + */ + @Override + public void clearCache() { + // TODO remove this method and rewrite the tests + // just clear it + map.clear(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java new file mode 100644 index 0000000..c585fce --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/ResourceReference.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * This is an object that represents the meta data associated with a reference + * to a resource in the shared cache. + */ +public class ResourceReference { + private final ApplicationId appId; + private final String shortUserName; + + /** + * Create a resource reference. + * + * @param appId ApplicationId that is referencing a resource. + * @param shortUserName ShortUserName of the user that created + * the reference. + */ + public ResourceReference(ApplicationId appId, String shortUserName) { + this.appId = appId; + this.shortUserName = shortUserName; + } + + public ApplicationId getAppId() { + return this.appId; + } + + public String getShortUserName() { + return this.shortUserName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((appId == null) ? 0 : appId.hashCode()); + result = + prime * result + + ((shortUserName == null) ? 0 : shortUserName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ResourceReference other = (ResourceReference) obj; + if (appId == null) { + if (other.appId != null) + return false; + } else if (!appId.equals(other.appId)) + return false; + if (shortUserName == null) { + if (other.shortUserName != null) + return false; + } else if (!shortUserName.equals(other.shortUserName)) + return false; + return true; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java new file mode 100644 index 0000000..91b6fac --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext; + + +/** + * An abstract class for the data store used by the shared cache manager + * service. All implementations of methods in this interface need to be thread + * safe and atomic. + *
+ * All implementations of SCMStore must provide a constructor that + * takes SCMContext as a sole argument. + */ +@Private +@Unstable +public abstract class SCMStore extends AbstractService { + private final SCMContext context; + + protected SCMStore(String name, SCMContext context) { + super(name); + this.context = context; + } + + /** + * Add a key to the shared cache, along with the associated filename. If the + * key already exists no action is taken. If the key did not exist, the key is + * added and the access time is set. + * + * @param key checksum of the cache entry + * @param fileName string of the actual file + * @return the file name of the file in the cache entry + */ + public abstract String addKey(String key, String fileName); + + + /** + * Remove an entry from the cache. + * + * @param key checksum of the cache entry + * @return true if the entry was removed or did not exist, false if the entry + * contained ApplicationIds and was not removed + */ + public abstract boolean removeKey(String key); + + /** + * Add a ResourceReference to a cache entry and update the + * entry's access time. + * + * @param key checksum of the cache entry + * @param id ApplicationId to add + * @param shortUsername the short username of the user responsible for adding + * the resource reference + * @return String name of the file corresponding to the cache entry if the id + * was added or already existed, null if the cache entry did not exist + */ + public abstract String addResourceReference(String key, ApplicationId id, + String shortUsername); + + /** + * Get all the ApplicationIds associated with the cache entry + * + * @param key checksum of the cache entry + * @return an unmodifiable collection of ResourceReferences. If + * the key does not exist, an empty set is returned. + */ + public abstract Collection getResourceReferences(String key); + + /** + * Get the access time for a cache entry. + * + * @param key checksum of the cache entry + * @return the access time in milliseconds of the cache entry, -1 if the entry + * does not exist + */ + public abstract long getAccessTime(String key); + + /** + * Remove a ResourceReference from a cache entry. + * + * @param key checksum of the cache entry + * @param ref the ResourceReference to remove + * @param updateAccessTime true if the call should update the access time for + * the entry + * @return true if the entry was removed, false otherwise + */ + public abstract boolean removeResourceReference(String key, + ResourceReference ref, boolean updateAccessTime); + + /** + * Remove a collection of ResourceReferences from a cache entry. + * + * @param key checksum of the cache entry + * @param refs the collection of ResourceReferences to remove + * @param updateAccessTime true if the call should update the access time for + * the entry + */ + public abstract void removeResourceRefs(String key, + Collection refs, boolean updateAccessTime); + + /** + * Permanently removes all entries from the cache. + *
+ * This method exists solely to support some tests, and must not be used in a + * production situation. + */ + public abstract void clearCache(); + + /** + * Returns the SCMContext. + * @return + */ + protected SCMContext getSCMContext() { + return context; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java new file mode 100644 index 0000000..c3c8a7c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.sharedcachemanager.SCMContext; +import org.junit.Test; + +public class TestInMemorySCMStore { + @Test + public void testAddKeyConcurrency() throws Exception { + final SCMStore store = new InMemorySCMStore(new SCMContext()); + final String key = "key1"; + int count = 5; + ExecutorService exec = Executors.newFixedThreadPool(count); + List> futures = new ArrayList>(count); + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + final String fileName = "foo-" + i + ".jar"; + Callable task = new Callable() { + public String call() throws Exception { + start.await(); + String result = store.addKey(key, fileName); + System.out.println("fileName: " + fileName + ", result: " + result); + return result; + } + }; + futures.add(exec.submit(task)); + } + // start them all at the same time + start.countDown(); + // check the result; they should all agree with the value + Set results = new HashSet(); + for (Future future: futures) { + results.add(future.get()); + } + assertSame(1, results.size()); + exec.shutdown(); + store.close(); + } + + @Test + public void testAddAppIdNonExistentKey() throws Exception { + SCMStore store = new InMemorySCMStore(new SCMContext()); + String key = "key1"; + ApplicationId id = createAppId(1, 1L); + // try adding an app id without adding the key first + assertNull(store.addResourceReference(key, id, "user")); + store.close(); + } + + @Test + public void testRemoveKeyEmptyAppIds() throws Exception { + SCMStore store = new InMemorySCMStore(new SCMContext()); + String key = "key1"; + String fileName = "foo.jar"; + // first add key + store.addKey(key, fileName); + // try removing the key; it should return true + assertTrue(store.removeKey(key)); + store.close(); + } + + @Test + public void testAddAppIdRemoveKey() throws Exception { + SCMStore store = new InMemorySCMStore(new SCMContext()); + String key = "key1"; + ApplicationId id = createAppId(1, 1L); + String user = "user"; + // add the key, and then add an app id + store.addKey(key, "foo.jar"); + store.addResourceReference(key, id, user); + // removeKey should return false + assertTrue(!store.removeKey(key)); + // the entry and the app id should be intact + Collection refs = store.getResourceReferences(key); + assertTrue(refs != null); + assertEquals(Collections.singleton(new ResourceReference(id, user)), refs); + store.close(); + } + + @Test + public void testAddAppIdConcurrency() throws Exception { + final SCMStore store = new InMemorySCMStore(new SCMContext()); + final String key = "key1"; + final String user = "user"; + String fileName = "foo.jar"; + + // first add the key + store.addKey(key, fileName); + + // make concurrent addAppId calls (clients) + int count = 5; + ExecutorService exec = Executors.newFixedThreadPool(count); + List> futures = new ArrayList>(count); + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + final ApplicationId id = createAppId(i, i); + Callable task = new Callable() { + public String call() throws Exception { + start.await(); + return store.addResourceReference(key, id, user); + } + }; + futures.add(exec.submit(task)); + } + // start them all at the same time + start.countDown(); + // check the result + Set results = new HashSet(); + for (Future future: futures) { + results.add(future.get()); + } + // they should all have the same file name + assertSame(1, results.size()); + assertEquals(Collections.singleton(fileName), results); + // there should be 5 app ids as a result + Collection refs = store.getResourceReferences(key); + assertSame(count, refs.size()); + exec.shutdown(); + store.close(); + } + + @Test + public void testAddAppIdAddKeyConcurrency() throws Exception { + final SCMStore store = new InMemorySCMStore(new SCMContext()); + final String key = "key1"; + final String fileName = "foo.jar"; + final String user = "user"; + final ApplicationId id = createAppId(1, 1L); + // add the key and add the id at the same time + ExecutorService exec = Executors.newFixedThreadPool(2); + final CountDownLatch start = new CountDownLatch(1); + Callable addKeyTask = new Callable() { + public String call() throws Exception { + start.await(); + return store.addKey(key, fileName); + } + }; + Callable addAppIdTask = new Callable() { + public String call() throws Exception { + start.await(); + return store.addResourceReference(key, id, user); + } + }; + Future addAppIdFuture = exec.submit(addAppIdTask); + Future addKeyFuture = exec.submit(addKeyTask); + // start them at the same time + start.countDown(); + // get the results + String addKeyResult = addKeyFuture.get(); + String addAppIdResult = addAppIdFuture.get(); + assertEquals(fileName, addKeyResult); + System.out.println("addAppId() result: " + addAppIdResult); + // it may be null or the fileName depending on the timing + assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName)); + exec.shutdown(); + store.close(); + } + + @Test + public void testRemoveAppIds() throws Exception { + SCMStore store = new InMemorySCMStore(new SCMContext()); + String key = "key1"; + String fileName = "foo.jar"; + String user = "user"; + // first add the key + store.addKey(key, fileName); + // add an app id + ApplicationId id = createAppId(1, 1L); + ResourceReference myRef = new ResourceReference(id, user); + String result = store.addResourceReference(key, id, user); + assertEquals(fileName, result); + Collection refs = store.getResourceReferences(key); + assertSame(1, refs.size()); + assertEquals(Collections.singleton(myRef), refs); + // remove the same key + store.removeResourceRefs(key, Collections.singleton(myRef), true); + Collection newRefs = store.getResourceReferences(key); + assertTrue(newRefs == null || newRefs.isEmpty()); + store.close(); + } + + @Test + public void testRemoveAppId() throws Exception { + SCMStore store = new InMemorySCMStore(new SCMContext()); + String key = "key1"; + String fileName = "foo.jar"; + String user = "user"; + // first add the key + store.addKey(key, fileName); + // add an app id + ApplicationId id = createAppId(1, 1L); + ResourceReference myRef = new ResourceReference(id, user); + String result = store.addResourceReference(key, id, user); + assertEquals(fileName, result); + Collection refs = store.getResourceReferences(key); + assertSame(1, refs.size()); + assertEquals(Collections.singleton(myRef), refs); + // remove the same key + store.removeResourceReference(key, myRef, true); + Collection newRefs = store.getResourceReferences(key); + assertTrue(newRefs == null || newRefs.isEmpty()); + store.close(); + } + + @Test + public void testBootstrapping() throws Exception { + Map initialCachedEntries = new HashMap(); + int count = 10; + for (int i = 0; i < count; i++) { + String key = String.valueOf(i); + String fileName = key + ".jar"; + initialCachedEntries.put(key, fileName); + } + + SCMContext context = new SCMContext(initialCachedEntries, null); + SCMStore store = new InMemorySCMStore(context); + ApplicationId id = createAppId(1, 1L); + // the entries from the cached entries should now exist + for (int i = 0; i < count; i++) { + String key = String.valueOf(i); + String fileName = key + ".jar"; + String result = store.addResourceReference(key, id, "user"); + // the value should not be null (i.e. it has the key) and the filename should match + assertEquals(fileName, result); + // the initial input should be emptied + assertTrue(initialCachedEntries.isEmpty()); + } + store.close(); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } +}