diff --git hadoop-yarn-project/hadoop-yarn/bin/yarn hadoop-yarn-project/hadoop-yarn/bin/yarn index 200ab27..b54a2d8 100644 --- hadoop-yarn-project/hadoop-yarn/bin/yarn +++ hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -66,6 +66,7 @@ function print_usage(){ echo " nodemanager run a nodemanager on each slave" echo " timelineserver run the timeline server" echo " rmadmin admin tools" + echo " sharedcachemanager run the SharedCacheManager daemon" echo " version print the version" echo " jar run a jar file" echo " application prints application(s)" @@ -249,6 +250,9 @@ elif [ "$COMMAND" = "proxyserver" ] ; then if [ "$YARN_PROXYSERVER_HEAPSIZE" != "" ]; then JAVA_HEAP_MAX="-Xmx""$YARN_PROXYSERVER_HEAPSIZE""m" fi +elif [ "$COMMAND" = "sharedcachemanager" ] ; then + CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager' + YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS" elif [ "$COMMAND" = "version" ] ; then CLASS=org.apache.hadoop.util.VersionInfo YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 39d1dd3..6f411d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1226,6 +1226,28 @@ public static final String TIMELINE_SERVICE_KEYTAB = TIMELINE_SERVICE_PREFIX + "keytab"; + // /////////////////////////////// + // Shared Cache Configs + // /////////////////////////////// + public static final String SHARED_CACHE_PREFIX = "yarn.sharedcache."; + + // common configs + /** whether the shared cache is enabled/disabled */ + public static final String SHARED_CACHE_ENABLED = SHARED_CACHE_PREFIX + + "enabled"; + public static final boolean DEFAULT_SHARED_CACHE_ENABLED = false; + + /** The config key for the shared cache root directory. */ + public static final String SHARED_CACHE_ROOT = SHARED_CACHE_PREFIX + + "root"; + public static final String DEFAULT_SHARED_CACHE_ROOT = "/sharedcache"; + + /** The config key for the level of nested directories before getting to the + * checksum directory. */ + public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX + + "nested.level"; + public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9b2b676..a029567 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1322,6 +1322,26 @@ org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + + + Whether the shared cache is enabled + yarn.sharedcache.enabled + false + + + + The root directory for the shared cache + yarn.sharedcache.root + /sharedcache + + + + The level of nested directories before getting to the checksum + directories. It must be non-negative. + yarn.sharedcache.nested.level + 3 + + The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java new file mode 100644 index 0000000..cd620c3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/CacheStructureUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcache; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A utility class that contains helper methods for dealing with the internal + * shared cache structure. + * + */ +public class CacheStructureUtil { + + private static final Log LOG = LogFactory.getLog(CacheStructureUtil.class); + + public static int getCacheDepth(Configuration conf) { + int cacheDepth = + conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL, + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL); + + if (cacheDepth <= 0) { + LOG.warn("Specified cache depth was less than or equal to zero." + + " Using default value instead. Default: " + + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL + + ", Specified: " + cacheDepth); + cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + } + + return cacheDepth; + } + + public static String getCacheEntryPath(int cacheDepth, String cacheRoot, + String checksum) { + + if (cacheDepth <= 0) { + throw new IllegalArgumentException( + "The cache depth must be greater than 0. Passed value: " + cacheDepth); + } + if (checksum.length() < cacheDepth) { + throw new IllegalArgumentException("The checksum passed was too short: " + + checksum); + } + + // Build the cache entry path to the specified depth. For example, if the + // depth is 3 and the checksum is 3c4f, the path would be: + // SHARED_CACHE_ROOT/3/c/4/3c4f + StringBuilder sb = new StringBuilder(cacheRoot); + for (int i = 0; i < cacheDepth; i++) { + sb.append(Path.SEPARATOR_CHAR); + sb.append(checksum.charAt(i)); + } + sb.append(Path.SEPARATOR_CHAR).append(checksum); + + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml new file mode 100644 index 0000000..0969274 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + hadoop-yarn-server + org.apache.hadoop + 3.0.0-SNAPSHOT + + org.apache.hadoop + hadoop-yarn-server-sharedcachemanager + 3.0.0-SNAPSHOT + hadoop-yarn-server-sharedcachemanager + + + + ${project.parent.parent.basedir} + + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + + + junit + junit + test + + + org.mockito + mockito-all + test + + + org.apache.hadoop + hadoop-common + test-jar + test + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + test-jar + + + + + + + + + maven-jar-plugin + + + + test-jar + + test-compile + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java new file mode 100644 index 0000000..f0f67ee --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * An interface for checking whether an app is running so that the cleaner + * service may determine if it can safely remove a cached entry. + */ +public interface AppChecker { + /** + * Returns whether the app is in the active state. + * + * @return true if the app is found and is not in one of the completed states; + * false otherwise + * @throws IOException if there is an error in determining the app state + */ + boolean appIsActive(ApplicationId id) throws IOException; + + /** + * Returns the list of all active apps at the given time. + * + * @return the list of active apps, or an empty list if there is none + * @throws IOException if there is an error in obtaining the list + */ + Collection getAllActiveApps() throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java new file mode 100644 index 0000000..937fcc7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +/** + * An implementation of AppChecker that queries the resource manager remotely to + * determine whether the app is running. + */ +class RemoteAppChecker implements AppChecker { + private static final Log LOG = LogFactory.getLog(RemoteAppChecker.class); + private static final EnumSet ACTIVE_STATES = + EnumSet.complementOf(EnumSet.of(YarnApplicationState.FINISHED, + YarnApplicationState.FAILED, + YarnApplicationState.KILLED)); + + private final ApplicationClientProtocol applicationsManager; + + /** + * Creates an instance of RemoteAppChecker based on the configuration. + */ + public static AppChecker create(Configuration conf) { + // create the RM proxy based on the configuration + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress rmAddress = + conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + LOG.info("Connecting to ResourceManager at " + rmAddress); + ApplicationClientProtocol applicationsManager = + (ApplicationClientProtocol)rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); + LOG.info("Connected to ResourceManager at " + rmAddress); + return new RemoteAppChecker(applicationsManager); + } + + RemoteAppChecker(ApplicationClientProtocol applicationsManager) { + this.applicationsManager = applicationsManager; + } + + public boolean appIsActive(ApplicationId id) throws IOException { + GetApplicationReportRequest request = + GetApplicationReportRequest.newInstance(id); + + try { + GetApplicationReportResponse response = + applicationsManager.getApplicationReport(request); + ApplicationReport report = response.getApplicationReport(); + if (report == null) { + // the app does not exist + return false; + } + + return ACTIVE_STATES.contains(report.getYarnApplicationState()); + } catch (YarnException e) { + throw new IOException(e); + } + } + + public Collection getAllActiveApps() throws IOException { + GetApplicationsRequest request = + GetApplicationsRequest.newInstance(ACTIVE_STATES); + + try { + GetApplicationsResponse response = + applicationsManager.getApplications(request); + List activeApps = new ArrayList(); + List apps = response.getApplicationList(); + for (ApplicationReport app: apps) { + activeApps.add(app.getApplicationId()); + } + return activeApps; + } catch (YarnException e) { + throw new IOException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java new file mode 100644 index 0000000..6c4f1e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMContext.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +/** + * A context object for the shared cache manager. + */ +public class SCMContext { + private final long startTime; + private final Map initialCachedEntries; + private final Collection initialActiveApps; + + public SCMContext() { + this(null, null); + } + + public SCMContext(Map initialCachedEntries, + Collection initialActiveApps) { + this.startTime = System.currentTimeMillis(); + this.initialCachedEntries = initialCachedEntries; + this.initialActiveApps = initialActiveApps; + } + + /** + * Returns the start time when the SCM context was created. + */ + public long getStartTime() { + return this.startTime; + } + + /** + * Returns the initial cached entries that was set when the SCM context was + * created. Note that the map is not thread safe. The user of this map is + * expected to provide thread safety or use it in a single-threaded manner. + *
+ * The user may remove entries from this map once it is consumed fully. + */ + public Map getInitialCachedEntries() { + return initialCachedEntries; + } + + /** + * Returns the initial active apps that was set when the SCM context was + * created. Note that this list is not thread safe. The user of this list is + * expected to provide thread safety or use it in a single-threaded manner. + *
+ * The user may remove entries from this list as apps are identified as no + * longer active. + */ + public Collection getInitialActiveApps() { + return initialActiveApps; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java new file mode 100644 index 0000000..a3d525e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This service maintains the shared cache meta data. It handles claiming and + * releasing of resources, all rpc calls from the client to the shared cache + * manager, and administrative commands. It also persists the shared cache meta + * data to a backend store, and cleans up stale entries on a regular basis. + */ +public class SharedCacheManager extends CompositeService { + /** + * Priority of the SharedCacheManager shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private static final Log LOG = LogFactory.getLog(SharedCacheManager.class); + + private Configuration conf; + + public SharedCacheManager() { + super("SharedCacheManager"); + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + AppChecker appChecker = RemoteAppChecker.create(conf); + + try { + SCMContext context = createSCMContext(appChecker, conf); + + } catch (IOException e) { + LOG.error("Encountered unexpected exception while initializing the shared cache manager", + e); + throw new YarnRuntimeException(e); + } + + super.serviceInit(conf); + } + + @VisibleForTesting + SCMContext createSCMContext(AppChecker appChecker, Configuration conf) + throws IOException { + // obtain the list of active apps as part of the context + LOG.info("Getting the active app list to initialize the SCM context"); + Collection activeAppList = getActiveAppList(appChecker); + LOG.info(activeAppList.size() + " apps recorded as active at this time"); + + // obtain the list of cached entries from the filesystem + LOG.info("Getting the list of all cached entries from the filesystem"); + FileSystem fs = FileSystem.get(conf); + Map initialCachedEntries = getInitialCachedEntries(fs, conf); + LOG.info(initialCachedEntries.size() + + " entries obtained from the filesystem"); + + SCMContext context = new SCMContext(initialCachedEntries, activeAppList); + return context; + } + + @VisibleForTesting + Collection getActiveAppList(AppChecker appChecker) + throws IOException { + return appChecker.getAllActiveApps(); + } + + @VisibleForTesting + Map getInitialCachedEntries(FileSystem fs, Configuration conf) + throws IOException { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path root = new Path(location); + if (!fs.exists(root)) { + String message = "The shared cache root " + location + " was not found"; + LOG.error(message); + throw new IOException(message); + } + + int nestedLevel = CacheStructureUtil.getCacheDepth(conf); + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d//file) + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < nestedLevel+1; i++) { + pattern.append("*/"); + } + pattern.append("*"); + + LOG.info("Querying for all individual cached entry files"); + FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString())); + int numEntries = entries == null ? 0 : entries.length; + LOG.info("Found " + numEntries + " files: processing for one entity per " + + "checksum"); + + Map initialCachedEntries = new HashMap(); + for (FileStatus entry: entries) { + Path file = entry.getPath(); + String fileName = file.getName(); + if (entry.isFile()) { + // get the parent to get the checksum + Path parent = file.getParent(); + if (parent != null) { + // the name of the immediate parent directory is the checksum + String key = parent.getName(); + // make sure we insert only one file per checksum whichever comes + // first + String mapped = initialCachedEntries.get(key); + if (mapped != null) { + LOG.warn("Key " + key + " is already mapped to file " + mapped + + "; file " + fileName + " will not be added"); + } else { + initialCachedEntries.put(key, fileName); + } + } + } + } + LOG.info("A total of " + initialCachedEntries.size() + + " files are now mapped"); + return initialCachedEntries; + } + + @Override + protected synchronized void serviceStart() throws Exception { + // Start metrics + DefaultMetricsSystem.initialize("SharedCacheManager"); + JvmMetrics.initSingleton("SharedCacheManager", null); + + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + + DefaultMetricsSystem.shutdown(); + super.serviceStop(); + } + + public static void main(String[] args) { + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG); + try { + Configuration conf = new YarnConfiguration(); + SharedCacheManager sharedCacheManager = new SharedCacheManager(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(sharedCacheManager), + SHUTDOWN_HOOK_PRIORITY); + sharedCacheManager.init(conf); + sharedCacheManager.start(); + } catch (Throwable t) { + LOG.fatal("Error starting SharedCacheManager", t); + System.exit(-1); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java new file mode 100644 index 0000000..6768d9d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.junit.Test; + +public class TestRemoteAppChecker { + + @Test + public void testNonExistentApp() throws Exception { + ApplicationClientProtocol applicationsManager = mock(ApplicationClientProtocol.class); + GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); + // mock the response so that it returns a null application report + when(response.getApplicationReport()).thenReturn(null); + when(applicationsManager.getApplicationReport(isA(GetApplicationReportRequest.class))). + thenReturn(response); + + AppChecker appChecker = new RemoteAppChecker(applicationsManager); + assertFalse(appChecker.appIsActive(isA(ApplicationId.class))); + } + + @Test + public void testRunningApp() throws Exception { + ApplicationClientProtocol applicationsManager = mock(ApplicationClientProtocol.class); + GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); + ApplicationReport report = mock(ApplicationReport.class); + when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.ACCEPTED); + // mock the response so that it returns an application report in the + // accepted state + when(response.getApplicationReport()).thenReturn(report); + when(applicationsManager.getApplicationReport(isA(GetApplicationReportRequest.class))). + thenReturn(response); + + AppChecker appChecker = new RemoteAppChecker(applicationsManager); + assertTrue(appChecker.appIsActive(isA(ApplicationId.class))); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java new file mode 100644 index 0000000..4be80d5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSharedCacheManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestSharedCacheManager { + private static final Configuration conf = new YarnConfiguration(); + + private static final FileSystem fs; + + private static final String KEY1 = "abcdefgh"; + private static final String FILE1 = "foo"; + private static final String KEY2 = "ijklmnop"; + private static final String FILE2 = "bar"; + + private static final Path baseDir; + private static final String root; + private static final String path1; + private static final String path2; + + static { + try { + fs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + baseDir = + new Path("target", TestSharedCacheManager.class.getSimpleName()) + .makeQualified(fs.getUri(), fs.getWorkingDirectory()); + root = baseDir.toUri().getPath(); + // set the shared cache root + conf.set(YarnConfiguration.SHARED_CACHE_ROOT, root); + + path1 = getFullPath(KEY1, FILE1); + path2 = getFullPath(KEY2, FILE2); + } + + private static String getFullPath(String key, String fileName) { + int cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + return CacheStructureUtil.getCacheEntryPath(cacheDepth, root, key) + + Path.SEPARATOR + fileName; + } + + @Before + public void setUp() throws IOException { + FileContext files = FileContext.getLocalFSFileContext(); + files.mkdir(baseDir, null, true); + // add a couple of directories and files + createFile(new Path(path1), files); + createFile(new Path(path2), files); + } + + @After + public void shutDown() throws IOException { + FileContext files = FileContext.getLocalFSFileContext(); + files.delete(baseDir, true); + } + + private void createFile(Path path, FileContext files) throws IOException { + Path parent = path.getParent(); + files.mkdir(parent, null, true); + FSDataOutputStream out = null; + try { + out = files.create(path, EnumSet.of(CREATE, OVERWRITE)); + out.writeUTF("This is a test"); + } finally { + if (out != null) { + out.close(); + } + } + } + + @Test + public void testCreateSCMContext() throws Exception { + SharedCacheManager scm = new SharedCacheManager(); + + AppChecker appChecker = mock(AppChecker.class); + List ids = new ArrayList(); + ids.add(createAppId(1, 1L)); + ids.add(createAppId(2, 1L)); + ids.add(createAppId(3, 1L)); + when(appChecker.getAllActiveApps()).thenReturn(ids); + + SCMContext context = scm.createSCMContext(appChecker, conf); + Map initialCachedEntries = context.getInitialCachedEntries(); + // verify the initial cached entries + assertSame(2, initialCachedEntries.size()); + assertEquals(FILE1, initialCachedEntries.get(KEY1)); + assertEquals(FILE2, initialCachedEntries.get(KEY2)); + + // verify the active apps + assertSame(ids, context.getInitialActiveApps()); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index b635d10..886773a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -39,6 +39,7 @@ hadoop-yarn-server-nodemanager hadoop-yarn-server-web-proxy hadoop-yarn-server-resourcemanager + hadoop-yarn-server-sharedcachemanager hadoop-yarn-server-tests hadoop-yarn-server-applicationhistoryservice