From 0e1f2088d63dfa4be1bc4163330a5f3935c3ce01 Mon Sep 17 00:00:00 2001 From: nnaemeka Date: Tue, 7 Jun 2016 12:24:34 -0700 Subject: [PATCH] Make Node-Manager Download-Resource Component extensible. --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 14 ++++ .../org/apache/hadoop/yarn/util/FSDownload.java | 10 ++- .../yarn/util/resource/FSResourceDownloader.java | 80 ++++++++++++++++++++++ .../yarn/util/resource/ResourceDownloader.java | 59 ++++++++++++++++ .../localizer/ContainerLocalizer.java | 24 ++++++- .../localizer/ResourceLocalizationService.java | 16 ++++- 6 files changed, 196 insertions(+), 7 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/FSResourceDownloader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceDownloader.java diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 84ec894..2c062e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1671,6 +1671,20 @@ public static boolean isAclEnabled(Configuration conf) { + "DefaultRequestInterceptor"; /** + * The class to use for downloading resources. NOTE: this class must implement + * {@link ResourceDownloader}. + */ + public static final String NM_RESOURCE_DOWNLOADER_CLASS = NM_PREFIX + + "resource-downloader.class"; + + /** + * The default node-manager resource-downloader class. NOTE: this is used IFF + * {@link NM_RESOURCE_DOWNLOADER_CLASS} isn't specified. + */ + public static final String DEFAULT_NM_RESOURCE_DOWNLOADER_CLASS = + "org.apache.hadoop.yarn.util.resource.FSResourceDownloader"; + + /** * Default platform-agnostic CLASSPATH for YARN applications. A * comma-separated list of CLASSPATH entries. The parameter expansion marker * will be replaced with real parameter expansion marker ('%' for Windows and diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index bd9c907..39e5223 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -247,7 +247,15 @@ private static FileStatus getFileStatus(final FileSystem fs, final Path path, } } - private Path copy(Path sCopy, Path dstdir) throws IOException { + /** + * Copy a file from a source file {@link Path} to a destination directory + * {@link Path}. + * @param sCopy The source location file-path. + * @param dstdir The destination directory-path. + * @return The full destination file-path. + * @throws IOException + */ + protected Path copy(Path sCopy, Path dstdir) throws IOException { FileSystem sourceFs = sCopy.getFileSystem(conf); Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); FileStatus sStat = sourceFs.getFileStatus(sCopy); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/FSResourceDownloader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/FSResourceDownloader.java new file mode 100644 index 0000000..23c6d42 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/FSResourceDownloader.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.resource; + +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.FSDownload; + +import com.google.common.base.Preconditions; +import com.google.common.cache.LoadingCache; + +/** + * The file-system implementation of {@link ResourceDownloader}. Uses + * {@link FSDownload} to perform resource download. + */ +public class FSResourceDownloader implements ResourceDownloader { + private FSDownload fsDownloader; + + /** + * Creates a new instance of the {@link DefaultResourceDownloader} class. + */ + public FSResourceDownloader() { + this.fsDownloader = null; + } + + /** + * Initializes this instance of the {@link FSResourceDownloader} class. + * @param context The file context. + * @param user The user-group information. + * @param configuration The configuration information. + * @param path The destination path. + * @param resource The local resource information. + * @param status The file-status cache; this can be {@code null}. + * @throws YarnException if already initialized. + */ + public void initialize(FileContext context, UserGroupInformation user, + Configuration configuration, Path path, LocalResource resource, + LoadingCache> status) { + Preconditions.checkArgument(this.fsDownloader != null, + "FSResourceDownloader already initialized!"); + this.fsDownloader = + new FSDownload(context, user, configuration, path, resource, status); + } + + /** + * Downloads a resource to the specified {@link Path}, or throws an exception + * if unable to do so. + * @return The {@link Path} to the downloaded resource. + * @throws YarnException if not initialized. + */ + @Override + public Path call() throws Exception { + Preconditions.checkArgument(this.fsDownloader == null, + "FSResourceDownloader not initialized!"); + return this.fsDownloader.call(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceDownloader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceDownloader.java new file mode 100644 index 0000000..bb228ef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceDownloader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.resource; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.cache.LoadingCache; + +/** + * Downloads a {@link LocalResource} to a destination location ({@link Path}). + */ +public interface ResourceDownloader extends Callable { + /** + * Initializes this instance of the {@link ResourceDownloader} class. This + * should only be called once after {@link ResourceDownloader()}. + * @param context The file context. + * @param user The user-group information. + * @param configuration The configuration information. + * @param path The destination path. + * @param resource The local-resource information. + * @param status The file-status cache; this can be {@code null}. + * @throws YarnException if already initialized. + */ + public void initialize(FileContext context, UserGroupInformation user, + Configuration configuration, Path path, LocalResource resource, + LoadingCache> status); + + /** + * Downloads a resource to the specified {@link Path}, or throws an exception + * if unable to do so. + * @return The {@link Path} to the downloaded resource. + */ + public Path call() throws Exception; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 57cc346..32495c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -69,7 +70,8 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.FSDownload; +import org.apache.hadoop.yarn.util.resource.FSResourceDownloader; +import org.apache.hadoop.yarn.util.resource.ResourceDownloader; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -196,10 +198,25 @@ ExecutorService createDownloadThreadPool() { return new ExecutorCompletionService(exec); } + /** + * Initializes a download routine for a remote {@link LocalResource}, to a + * local {@link Path}, for a given {@link UserGroupInformation}. + * @param path The destination path. + * @param rsrc The local resource information. + * @param ugi The user-group information. + * @return A method that performs the actual download. + * @throws IOException. + */ Callable download(Path path, LocalResource rsrc, - UserGroupInformation ugi) throws IOException { + UserGroupInformation ugi) throws IOException, YarnException { DiskChecker.checkDir(new File(path.toUri().getRawPath())); - return new FSDownload(lfs, ugi, conf, path, rsrc); + // create and initialize the actual resource-downloader. + ResourceDownloader downloader = + ReflectionUtils.newInstance(this.conf.getClass( + YarnConfiguration.NM_RESOURCE_DOWNLOADER_CLASS, + FSResourceDownloader.class, ResourceDownloader.class), this.conf); + downloader.initialize(lfs, ugi, conf, path, rsrc, null); + return downloader; } static long getEstimatedSize(LocalResource rsrc) { @@ -267,6 +284,7 @@ protected void localizeFiles(LocalizationProtocol nodemanager, } catch (InterruptedException e) { return; } catch (YarnException e) { + LOG.error("Unexpected error.", e); // TODO cleanup throw e; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b2413ad..d8cfb69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -72,6 +72,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; @@ -83,6 +84,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -134,6 +136,8 @@ import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import org.apache.hadoop.yarn.util.resource.FSResourceDownloader; +import org.apache.hadoop.yarn.util.resource.ResourceDownloader; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -839,9 +843,15 @@ public void addResource(LocalizerResourceRequestEvent request) { // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { - pending.put(queue.submit(new FSDownload(lfs, null, conf, - publicDirDestPath, resource, request.getContext().getStatCache())), - request); + // create and initialize the actual resource-downloader. + ResourceDownloader downloader = + ReflectionUtils.newInstance(this.conf.getClass( + YarnConfiguration.NM_RESOURCE_DOWNLOADER_CLASS, + FSResourceDownloader.class, ResourceDownloader.class), + this.conf); + downloader.initialize(lfs, ugi, this.conf, publicDirDestPath, + resource, request.getContext().getStatCache()); + pending.put(queue.submit(downloader), request); } } catch (IOException e) { rsrc.unlock(); -- 2.8.1.windows.1