diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java index f14a136..7ece41b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java @@ -48,6 +48,14 @@ public static LocalResource newInstance(URL url, LocalResourceType type, LocalResourceVisibility visibility, long size, long timestamp, String pattern) { + return newInstance(url, type, visibility, size, timestamp, pattern, false); + } + + @Public + @Stable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + String pattern, boolean shouldBeUploadedToSharedCache) { LocalResource resource = Records.newRecord(LocalResource.class); resource.setResource(url); resource.setType(type); @@ -55,6 +63,7 @@ public static LocalResource newInstance(URL url, LocalResourceType type, resource.setSize(size); resource.setTimestamp(timestamp); resource.setPattern(pattern); + resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; } @@ -65,6 +74,15 @@ public static LocalResource newInstance(URL url, LocalResourceType type, return newInstance(url, type, visibility, size, timestamp, null); } + @Public + @Stable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + boolean shouldBeUploadedToSharedCache) { + return newInstance(url, type, visibility, size, timestamp, null, + shouldBeUploadedToSharedCache); + } + /** * Get the location of the resource to be localized. * @return location of the resource to be localized @@ -170,4 +188,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Public @Stable public abstract void setPattern(String pattern); + + /** + * NM uses it to decide whether if it is necessary to upload the resource + * shared cache + */ + @Public + @Stable + public abstract boolean getShouldBeUploadedToSharedCache(); + + /** + * Inform NM whether upload to SCM is needed. + * + * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache + * of this request + */ + @Public + @Stable + public abstract void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dced082..401274a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1304,6 +1304,25 @@ public static final String DEFAULT_SCM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; + /** the checksum algorithm implementation **/ + public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL = + SHARED_CACHE_PREFIX + "checksum.algo.impl"; + public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL = + "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl"; + + // node manager (uploader) configs + /** + * The replication factor for the node manager uploader for the shared cache. + */ + public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR = + SHARED_CACHE_PREFIX + "nm.uploader.replication.factor"; + public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR = + 10; + + public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = + SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; + public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 48aac9d..9ffb2df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -159,6 +159,7 @@ message LocalResourceProto { optional LocalResourceTypeProto type = 4; optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; + optional bool should_be_uploaded_to_shared_cache = 7; } message ApplicationResourceUsageReportProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java index 16bd597..560b081 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java @@ -192,6 +192,26 @@ public synchronized void setPattern(String pattern) { builder.setPattern(pattern); } + @Override + public synchronized boolean getShouldBeUploadedToSharedCache() { + LocalResourceProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasShouldBeUploadedToSharedCache()) { + return false; + } + return p.getShouldBeUploadedToSharedCache(); + } + + @Override + public synchronized void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + maybeInitBuilder(); + if (!shouldBeUploadedToSharedCache) { + builder.clearShouldBeUploadedToSharedCache(); + return; + } + builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); + } + private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) { return ProtoUtils.convertToProtoFormat(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java new file mode 100644 index 0000000..2807d95 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An interface to calculate a checksum. The checksum implementation should be + * thread safe. + */ +public interface Checksum { + + /** + * Calculate the checksum of the passed input stream. + * + * @param in InputStream to be checksumed + * @return the message digest of the input stream + * @throws IOException + */ + public String computeChecksum(InputStream in) throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java new file mode 100644 index 0000000..d904911 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A factory class for creating checksum objects based on a configurable + * algorithm implementation + */ +public class ChecksumFactory { + private static final ConcurrentMap instances = + new ConcurrentHashMap(); + + /** + * Get a new Checksum object based on the configurable algorithm + * implementation (see yarn.sharedcache.checksum.algo.impl) + * + * @return Checksum object + */ + public static Checksum getChecksum(Configuration conf) { + String className = + conf.get(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL, + YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); + Checksum checksum = instances.get(className); + if (checksum == null) { + try { + Class clazz = Class.forName(className); + checksum = (Checksum) clazz.newInstance(); + Checksum old = instances.putIfAbsent(className, checksum); + if (old != null) { + checksum = old; + } + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + return checksum; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java new file mode 100644 index 0000000..f25b717 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.codec.digest.DigestUtils; + +/** + * The SHA-256 implementation of the shared cache checksum interface. + */ +public class ChecksumSHA256Impl implements Checksum { + public String computeChecksum(InputStream in) throws IOException { + return DigestUtils.sha256Hex(in); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 67c3bc7..93818fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -134,7 +134,7 @@ private void createDir(Path path, FsPermission perm) throws IOException { * otherwise */ @VisibleForTesting - static boolean isPublic(FileSystem fs, Path current, FileStatus sStat, + public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat, LoadingCache> statCache) throws IOException { current = fs.makeQualified(current); //the leaf level file should be readable by others diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index fc8ca5d..6827763 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1351,13 +1351,31 @@ yarn.sharedcache.manager.admin.thread-count 1 - + The address of the web application in the SCM (shared cache manager) yarn.sharedcache.manager.webapp.address 0.0.0.0:8788 + + The algorithm used to compute checksums of files (SHA-256 by default) + yarn.sharedcache.checksum.algo.impl + org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl + + + + The replication factor for the node manager uploader for the shared cache (10 by default) + yarn.sharedcache.nm.uploader.replication.factor + 10 + + + + The number of threads used to upload files from a node manager instance (20 by default) + yarn.sharedcache.nm.uploader.thread-count + 20 + + The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index ac25c00..1c78f95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -94,7 +94,8 @@ public int compare(ContainerId c1, } public static LocalResource newLocalResource(URL url, LocalResourceType type, - LocalResourceVisibility visibility, long size, long timestamp) { + LocalResourceVisibility visibility, long size, long timestamp, + boolean shouldBeUploadedToSharedCache) { LocalResource resource = recordFactory.newRecordInstance(LocalResource.class); resource.setResource(url); @@ -102,14 +103,16 @@ public static LocalResource newLocalResource(URL url, LocalResourceType type, resource.setVisibility(visibility); resource.setSize(size); resource.setTimestamp(timestamp); + resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; } public static LocalResource newLocalResource(URI uri, LocalResourceType type, LocalResourceVisibility visibility, long size, - long timestamp) { + long timestamp, + boolean shouldBeUploadedToSharedCache) { return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type, - visibility, size, timestamp); + visibility, size, timestamp, shouldBeUploadedToSharedCache); } public static ApplicationId newApplicationId(RecordFactory recordFactory, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ded2013..975a83c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -110,6 +110,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; @@ -213,6 +215,13 @@ public void serviceInit(Configuration conf) throws Exception { addIfService(logHandler); dispatcher.register(LogHandlerEventType.class, logHandler); + // add the shared cache upload service (it will do nothing if the shared + // cache is disabled) + SharedCacheUploadService sharedCacheUploader = + createSharedCacheUploaderService(); + addService(sharedCacheUploader); + dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); + waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + @@ -254,6 +263,10 @@ protected ResourceLocalizationService createResourceLocalizationService( deletionContext, dirsHandler, context.getNMStateStore()); } + protected SharedCacheUploadService createSharedCacheUploaderService() { + return new SharedCacheUploadService(); + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 22e47e7..53a21a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -58,6 +59,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; @@ -100,6 +103,10 @@ new ArrayList(); private final List appRsrcs = new ArrayList(); + private final Map resourcesToBeUploaded = + new ConcurrentHashMap(); + private final Map resourcesUploadPolicies = + new ConcurrentHashMap(); public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -550,6 +557,8 @@ public ContainerState transition(ContainerImpl container, container.pendingResources.put(req, links); } links.add(rsrc.getKey()); + storeSharedCacheUploadPolicies(container, req, rsrc.getValue() + .getShouldBeUploadedToSharedCache()); switch (rsrc.getValue().getVisibility()) { case PUBLIC: container.publicRsrcs.add(req); @@ -600,6 +609,25 @@ public ContainerState transition(ContainerImpl container, } } + // Store the resource's shared cache upload policies + // Given LocalResourceRequest can be shared across containers in + // LocalResourcesTrackerImpl, we preserve the upload policies here. + // In addition, it is possible for the application to create several + // "identical" LocalResources as part of + // ContainerLaunchContext.setLocalResources with different symlinks. + // There is a corner case where these "identical" local resources have + // different upload policies. For that scenario, upload policy will be set to + // true as long as there is at least one LocalResource entry with + // upload policy set to true. + private static void storeSharedCacheUploadPolicies(ContainerImpl container, + LocalResourceRequest resourceRequest, Boolean uploadPolicy) { + Boolean storedUploadPolicy = + container.resourcesUploadPolicies.get(resourceRequest); + if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) { + container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy); + } + } + /** * Transition when one of the requested resources for this container * has been successfully localized. @@ -611,22 +639,37 @@ public ContainerState transition(ContainerImpl container, public ContainerState transition(ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List syms = - container.pendingResources.remove(rsrcEvent.getResource()); + LocalResourceRequest resourceRequest = rsrcEvent.getResource(); + Path location = rsrcEvent.getLocation(); + List syms = container.pendingResources.remove(resourceRequest); if (null == syms) { - LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + + LOG.warn("Localized unknown resource " + resourceRequest + + " for container " + container.containerId); assert false; // fail container? return ContainerState.LOCALIZING; } - container.localizedResources.put(rsrcEvent.getLocation(), syms); + container.localizedResources.put(location, syms); + + // check to see if this resource should be uploaded to the shared cache + // as well + if (shouldBeUploadedToSharedCache(container, resourceRequest)) { + container.resourcesToBeUploaded.put(resourceRequest, location); + } if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, ContainersLauncherEventType.LAUNCH_CONTAINER)); + + // kick off uploads to the shared cache + container.dispatcher.getEventHandler().handle( + new SharedCacheUploadEvent(container.resourcesToBeUploaded, container + .getLaunchContext(), container.getUser(), + SharedCacheUploadEventType.UPLOAD)); + container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -936,4 +979,13 @@ private boolean hasDefaultExitCode() { return (this.exitCode == ContainerExitStatus.INVALID); } + /** + * Returns whether the specific resource should be uploaded to the shared + * cache. + */ + private static boolean shouldBeUploadedToSharedCache(ContainerImpl container, + LocalResourceRequest resource) { + return container.resourcesUploadPolicies.get(resource); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java index 70bead7..65a331a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java @@ -20,6 +20,8 @@ import java.net.URISyntaxException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -30,6 +32,8 @@ public class LocalResourceRequest extends LocalResource implements Comparable { + private static final Log LOG = LogFactory.getLog(LocalResourceRequest.class); + private final Path loc; private final long timestamp; private final LocalResourceType type; @@ -152,6 +156,17 @@ public String getPattern() { } @Override + public boolean getShouldBeUploadedToSharedCache() { + throw new UnsupportedOperationException(); + } + + @Override + public void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + throw new UnsupportedOperationException(); + } + + @Override public void setResource(URL resource) { throw new UnsupportedOperationException(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java new file mode 100644 index 0000000..2f27770 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java @@ -0,0 +1,54 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +public class SharedCacheUploadEvent extends + AbstractEvent { + private final Map resources; + private final ContainerLaunchContext context; + private final String user; + + public SharedCacheUploadEvent(Map resources, + ContainerLaunchContext context, String user, + SharedCacheUploadEventType eventType) { + super(eventType); + this.resources = resources; + this.context = context; + this.user = user; + } + + public Map getResources() { + return resources; + } + + public ContainerLaunchContext getContainerLaunchContext() { + return context; + } + + public String getUser() { + return user; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java new file mode 100644 index 0000000..19280ec --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java @@ -0,0 +1,23 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +public enum SharedCacheUploadEventType { + UPLOAD +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java new file mode 100644 index 0000000..d32819c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java @@ -0,0 +1,122 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Service that uploads localized files to the shared cache. The upload is + * considered not critical, and is done on a best-effort basis. Failure to + * upload is not fatal. + */ +public class SharedCacheUploadService extends AbstractService implements + EventHandler { + private static final Log LOG = + LogFactory.getLog(SharedCacheUploadService.class); + + private boolean enabled; + private FileSystem fs; + private FileSystem localFs; + private ExecutorService uploaderPool; + private NMCacheUploaderSCMProtocol scmClient; + + public SharedCacheUploadService() { + super(SharedCacheUploadService.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, + YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED); + if (enabled) { + int threadCount = + conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT, + YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT); + uploaderPool = Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder(). + setNameFormat("Shared cache uploader #%d"). + build()); + scmClient = createSCMClient(conf); + try { + fs = FileSystem.get(conf); + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + LOG.error("Unexpected exception in getting the filesystem", e); + throw new RuntimeException(e); + } + } + super.serviceInit(conf); + } + + private NMCacheUploaderSCMProtocol createSCMClient(Configuration conf) { + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_PORT); + return (NMCacheUploaderSCMProtocol)rpc.getProxy( + NMCacheUploaderSCMProtocol.class, scmAddress, conf); + } + + @Override + protected void serviceStop() throws Exception { + if (enabled) { + uploaderPool.shutdown(); + RPC.stopProxy(scmClient); + } + super.serviceStop(); + } + + @Override + public void handle(SharedCacheUploadEvent event) { + if (enabled) { + Map resources = event.getResources(); + for (Map.Entry e: resources.entrySet()) { + Uploader uploader = + new Uploader(e.getKey(), e.getValue(), event.getUser(), + getConfig(), scmClient, fs, localFs); + // fire off an upload task + uploaderPool.submit(uploader); + } + } + } + + public boolean isEnabled() { + return enabled; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java new file mode 100644 index 0000000..a3c1578 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java @@ -0,0 +1,290 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; +import org.apache.hadoop.yarn.sharedcache.Checksum; +import org.apache.hadoop.yarn.sharedcache.ChecksumFactory; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.FSDownload; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The callable class that handles the actual upload. + */ +class Uploader implements Callable { + // rwxr-xr-x + static final FsPermission DIRECTORY_PERMISSION = + new FsPermission((short)00755); + // r-xr-xr-x + static final FsPermission FILE_PERMISSION = + new FsPermission((short)00555); + + private static final Log LOG = LogFactory.getLog(Uploader.class); + private static final ThreadLocal randomTl = new ThreadLocal() { + @Override + protected Random initialValue() { + return new Random(System.nanoTime()); + } + }; + + private final LocalResource resource; + private final Path localPath; + private final String user; + private final Configuration conf; + private final NMCacheUploaderSCMProtocol scmClient; + private final FileSystem fs; + private final FileSystem localFs; + private final String sharedCacheRootDir; + private final int nestedLevel; + private final Checksum checksum; + private final RecordFactory recordFactory; + + public Uploader(LocalResource resource, Path localPath, String user, + Configuration conf, NMCacheUploaderSCMProtocol scmClient) + throws IOException { + this(resource, localPath, user, conf, scmClient, + FileSystem.get(conf), localPath.getFileSystem(conf)); + } + + /** + * @param resource the local resource that contains the original remote path + * @param localPath the path in the local filesystem where the resource is + * localized + * @param fs the filesystem of the shared cache + * @param localFs the local filesystem + */ + public Uploader(LocalResource resource, Path localPath, String user, + Configuration conf, NMCacheUploaderSCMProtocol scmClient, FileSystem fs, + FileSystem localFs) { + this.resource = resource; + this.localPath = localPath; + this.user = user; + this.conf = conf; + this.scmClient = scmClient; + this.fs = fs; + this.sharedCacheRootDir = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + this.nestedLevel = CacheStructureUtil.getCacheDepth(conf); + this.checksum = ChecksumFactory.getChecksum(conf); + this.localFs = localFs; + this.recordFactory = RecordFactoryProvider.getRecordFactory(null); + } + + /** + * Uploads the file under the shared cache, and notifies the shared cache + * manager. If it is unable to upload the file because it already exists, it + * returns false. + */ + @Override + public Boolean call() throws Exception { + Path tempPath = null; + try { + if (!verifyAccess()) { + LOG.warn("User " + user + " is not authorized to upload file " + + localPath.getName()); + return false; + } + + // first determine the actual local path that will be used for upload + Path actualPath = getActualPath(); + // compute the checksum + String checksumVal = computeChecksum(actualPath); + // create the directory (if it doesn't exist) + Path directoryPath = + new Path(CacheStructureUtil.getCacheEntryPath(nestedLevel, + sharedCacheRootDir, checksumVal)); + // let's not check if the directory already exists: in the vast majority + // of the cases, the directory does not exist; as long as mkdirs does not + // error out if it exists, we should be fine + fs.mkdirs(directoryPath, DIRECTORY_PERMISSION); + // create the temporary file + tempPath = new Path(directoryPath, getTemporaryFileName(actualPath)); + if (!uploadFile(actualPath, tempPath)) { + LOG.warn("Could not copy the file to the shared cache at " + tempPath); + return false; + } + + // set the permission so that it is readable but not writable + // TODO should I create the file with the right permission so I save the + // permission call? + fs.setPermission(tempPath, FILE_PERMISSION); + // rename it to the final filename + Path finalPath = new Path(directoryPath, actualPath.getName()); + if (!fs.rename(tempPath, finalPath)) { + LOG.warn("The file already exists under " + finalPath + + ". Ignoring this attempt."); + deleteTempFile(tempPath); + return false; + } + + // notify the SCM + if (!notifySharedCacheManager(checksumVal, actualPath.getName())) { + // the shared cache manager rejected the upload (as it is likely + // uploaded under a different name + // clean up this file and exit + fs.delete(finalPath, false); + return false; + } + + // set the replication factor + short replication = + (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR, + YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR); + fs.setReplication(finalPath, replication); + LOG.info("File " + actualPath.getName() + + " was uploaded to the shared cache at " + finalPath); + return true; + } catch (IOException e) { + LOG.warn("Exception while uploading the file " + localPath.getName(), e); + // in case an exception is thrown, delete the temp file + deleteTempFile(tempPath); + throw e; + } + } + + @VisibleForTesting + Path getActualPath() throws IOException { + Path path = localPath; + FileStatus status = localFs.getFileStatus(path); + if (status != null && status.isDirectory()) { + // for certain types of resources that get unpacked, the original file may + // be found under the directory with the same name (see + // FSDownload.unpack); check if the path is a directory and if so look + // under it + path = new Path(path, path.getName()); + } + return path; + } + + private void deleteTempFile(Path tempPath) { + try { + if (tempPath != null && fs.exists(tempPath)) { + fs.delete(tempPath, false); + } + } catch (IOException ignore) {} + } + + /** + * Checks that the (original) remote file is either owned by the user who + * started the app or public. + */ + @VisibleForTesting + boolean verifyAccess() throws IOException { + // if it is in the public cache, it's trivially OK + if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { + return true; + } + + final Path remotePath; + try { + remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); + } catch (URISyntaxException e) { + throw new IOException("Invalid resource", e); + } + + // get the file status of the HDFS file + FileSystem remoteFs = remotePath.getFileSystem(conf); + FileStatus status = remoteFs.getFileStatus(remotePath); + // check to see if the file has been modified in any way + if (status.getModificationTime() != resource.getTimestamp()) { + LOG.warn("The remote file " + remotePath + + " has changed since it's localized; will not consider it for upload"); + return false; + } + + // check for the user ownership + if (status.getOwner().equals(user)) { + return true; // the user owns the file + } + // check if the file is publicly readable otherwise + return fileIsPublic(remotePath, remoteFs, status); + } + + @VisibleForTesting + boolean fileIsPublic(final Path remotePath, FileSystem remoteFs, + FileStatus status) throws IOException { + return FSDownload.isPublic(remoteFs, remotePath, status, null); + } + + /** + * Uploads the file to the shared cache under a temporary name, and returns + * the result. + */ + @VisibleForTesting + boolean uploadFile(Path sourcePath, Path tempPath) throws IOException { + return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf); + } + + @VisibleForTesting + String computeChecksum(Path path) throws IOException { + InputStream is = localFs.open(path); + try { + return checksum.computeChecksum(is); + } finally { + try { is.close(); } catch (IOException ignore) {} + } + } + + private String getTemporaryFileName(Path path) { + return path.getName() + "-" + randomTl.get().nextLong(); + } + + @VisibleForTesting + boolean notifySharedCacheManager(String checksumVal, String fileName) + throws IOException { + try { + NotifySCMRequest request = + recordFactory.newRecordInstance(NotifySCMRequest.class); + request.setResourceKey(checksumVal); + request.setFilename(fileName); + return scmClient.notify(request).getAccepted(); + } catch (YarnException e) { + throw new IOException(e); + } catch (UndeclaredThrowableException e) { + // retrieve the cause of the exception and throw it as an IOException + throw new IOException(e.getCause() == null ? e : e.getCause()); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 6ad6010..3dcb510 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -641,7 +641,7 @@ public boolean matches(Object o) { URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name); LocalResource rsrc = BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, - r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false); return new SimpleEntry(name, rsrc); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index ed59ddd..c7ae2cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1645,7 +1645,7 @@ private static LocalResource getMockedResource(Random r, URL url = getPath("/local/PRIVATE/" + name); LocalResource rsrc = BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, - r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false); return rsrc; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java new file mode 100644 index 0000000..1b2b2f0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java @@ -0,0 +1,50 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import static org.junit.Assert.assertSame; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +public class TestSharedCacheUploadService { + + @Test + public void testInitDisabled() { + testInit(false); + } + + @Test + public void testInitEnabled() { + testInit(true); + } + + public void testInit(boolean enabled) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled); + + SharedCacheUploadService service = new SharedCacheUploadService(); + service.init(conf); + assertSame(enabled, service.isEnabled()); + + service.stop(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java new file mode 100644 index 0000000..e9eea0e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java @@ -0,0 +1,244 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; +import org.junit.Test; + +public class TestUploader { + + /** + * If verifyAccess fails, the upload should fail + */ + @Test + public void testFailVerifyAccess() throws Exception { + Uploader spied = createSpiedUploader(); + doReturn(false).when(spied).verifyAccess(); + + assertFalse(spied.call()); + } + + /** + * If rename fails, the upload should fail + */ + @Test + public void testRenameFail() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + NMCacheUploaderSCMProtocol scmClient = + mock(NMCacheUploaderSCMProtocol.class); + NotifySCMResponse response = mock(NotifySCMResponse.class); + when(response.getAccepted()).thenReturn(true); + when(scmClient.notify(isA(NotifySCMRequest.class))).thenReturn(response); + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false); + FileSystem localFs = FileSystem.getLocal(conf); + Uploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + + assertFalse(spied.call()); + } + + /** + * If verifyAccess, uploadFile, rename, and notification succeed, the upload + * should succeed + */ + @Test + public void testSuccess() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + NMCacheUploaderSCMProtocol scmClient = + mock(NMCacheUploaderSCMProtocol.class); + NotifySCMResponse response = mock(NotifySCMResponse.class); + when(response.getAccepted()).thenReturn(true); + when(scmClient.notify(isA(NotifySCMRequest.class))).thenReturn(response); + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + FileSystem localFs = FileSystem.getLocal(conf); + Uploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + // stub notifySharedCacheManager to return true + doReturn(true).when(spied).notifySharedCacheManager(isA(String.class), + isA(String.class)); + + assertTrue(spied.call()); + } + + /** + * If verifyAccess, uploadFile, and rename succed, but it receives a nay from + * SCM, the file should be deleted + */ + @Test + public void testNotifySCMFail() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + FileSystem localFs = FileSystem.getLocal(conf); + Uploader spied = + createSpiedUploader(resource, localPath, user, conf, null, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + // stub notifySharedCacheManager to return true + doReturn(false).when(spied).notifySharedCacheManager(isA(String.class), + isA(String.class)); + + assertFalse(spied.call()); + verify(fs).delete(isA(Path.class), anyBoolean()); + } + + /** + * If resource is public, verifyAccess should succeed + */ + @Test + public void testVerifyAccessPublicResource() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + // give public visibility + when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + NMCacheUploaderSCMProtocol scmClient = + mock(NMCacheUploaderSCMProtocol.class); + FileSystem fs = mock(FileSystem.class); + FileSystem localFs = FileSystem.getLocal(conf); + Uploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + + assertTrue(spied.verifyAccess()); + } + + /** + * If the localPath does not exists, getActualPath should get to one level + * down + */ + @Test + public void testGetActualPath() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + // give public visibility + when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); + Path localPath = new Path("foo.jar"); + String user = "joe"; + NMCacheUploaderSCMProtocol scmClient = + mock(NMCacheUploaderSCMProtocol.class); + FileSystem fs = mock(FileSystem.class); + FileSystem localFs = mock(FileSystem.class); + // stub it to return a status that indicates a directory + FileStatus status = mock(FileStatus.class); + when(status.isDirectory()).thenReturn(true); + when(localFs.getFileStatus(localPath)).thenReturn(status); + Uploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + + Path actualPath = spied.getActualPath(); + assertEquals(actualPath.getName(), localPath.getName()); + assertEquals(actualPath.getParent().getName(), localPath.getName()); + } + + private Uploader createSpiedUploader() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + String user = "foo"; + NMCacheUploaderSCMProtocol scmClient = + mock(NMCacheUploaderSCMProtocol.class); + FileSystem fs = FileSystem.get(conf); + FileSystem localFs = FileSystem.getLocal(conf); + return createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + } + + private Uploader createSpiedUploader(LocalResource resource, Path localPath, + String user, Configuration conf, NMCacheUploaderSCMProtocol scmClient, + FileSystem fs, FileSystem localFs) + throws IOException { + Uploader uploader = new Uploader(resource, localPath, user, conf, scmClient, + fs, localFs); + return spy(uploader); + } +}