diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index f14a136..7ece41b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -48,6 +48,14 @@
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp,
String pattern) {
+ return newInstance(url, type, visibility, size, timestamp, pattern, false);
+ }
+
+ @Public
+ @Stable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ String pattern, boolean shouldBeUploadedToSharedCache) {
LocalResource resource = Records.newRecord(LocalResource.class);
resource.setResource(url);
resource.setType(type);
@@ -55,6 +63,7 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
resource.setSize(size);
resource.setTimestamp(timestamp);
resource.setPattern(pattern);
+ resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
@@ -65,6 +74,15 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
return newInstance(url, type, visibility, size, timestamp, null);
}
+ @Public
+ @Stable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
+ return newInstance(url, type, visibility, size, timestamp, null,
+ shouldBeUploadedToSharedCache);
+ }
+
/**
* Get the location of the resource to be localized.
* @return location of the resource to be localized
@@ -170,4 +188,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Public
@Stable
public abstract void setPattern(String pattern);
+
+ /**
+ * NM uses it to decide whether if it is necessary to upload the resource
+ * shared cache
+ */
+ @Public
+ @Stable
+ public abstract boolean getShouldBeUploadedToSharedCache();
+
+ /**
+ * Inform NM whether upload to SCM is needed.
+ *
+ * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache
+ * of this request
+ */
+ @Public
+ @Stable
+ public abstract void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 09ea5e5..7e9dd3b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1340,6 +1340,25 @@
public static final String DEFAULT_SCM_WEBAPP_ADDRESS = "0.0.0.0:"
+ DEFAULT_SCM_WEBAPP_PORT;
+ /** the checksum algorithm implementation **/
+ public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ SHARED_CACHE_PREFIX + "checksum.algo.impl";
+ public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
+
+ // node manager (uploader) configs
+ /**
+ * The replication factor for the node manager uploader for the shared cache.
+ */
+ public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+ SHARED_CACHE_PREFIX + "nm.uploader.replication.factor";
+ public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
+ 10;
+
+ public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT =
+ SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
+ public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..33e017c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -159,6 +159,7 @@ message LocalResourceProto {
optional LocalResourceTypeProto type = 4;
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
+ optional bool should_be_uploaded_to_shared_cache = 7;
}
message ApplicationResourceUsageReportProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 16bd597..560b081 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -192,6 +192,26 @@ public synchronized void setPattern(String pattern) {
builder.setPattern(pattern);
}
+ @Override
+ public synchronized boolean getShouldBeUploadedToSharedCache() {
+ LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasShouldBeUploadedToSharedCache()) {
+ return false;
+ }
+ return p.getShouldBeUploadedToSharedCache();
+ }
+
+ @Override
+ public synchronized void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ maybeInitBuilder();
+ if (!shouldBeUploadedToSharedCache) {
+ builder.clearShouldBeUploadedToSharedCache();
+ return;
+ }
+ builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
+ }
+
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java
new file mode 100644
index 0000000..2807d95
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface to calculate a checksum. The checksum implementation should be
+ * thread safe.
+ */
+public interface Checksum {
+
+ /**
+ * Calculate the checksum of the passed input stream.
+ *
+ * @param in InputStream to be checksumed
+ * @return the message digest of the input stream
+ * @throws IOException
+ */
+ public String computeChecksum(InputStream in) throws IOException;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java
new file mode 100644
index 0000000..d904911
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A factory class for creating checksum objects based on a configurable
+ * algorithm implementation
+ */
+public class ChecksumFactory {
+ private static final ConcurrentMap instances =
+ new ConcurrentHashMap();
+
+ /**
+ * Get a new Checksum object based on the configurable algorithm
+ * implementation (see yarn.sharedcache.checksum.algo.impl)
+ *
+ * @return Checksum object
+ */
+ public static Checksum getChecksum(Configuration conf) {
+ String className =
+ conf.get(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+ Checksum checksum = instances.get(className);
+ if (checksum == null) {
+ try {
+ Class> clazz = Class.forName(className);
+ checksum = (Checksum) clazz.newInstance();
+ Checksum old = instances.putIfAbsent(className, checksum);
+ if (old != null) {
+ checksum = old;
+ }
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ return checksum;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
new file mode 100644
index 0000000..f25b717
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+/**
+ * The SHA-256 implementation of the shared cache checksum interface.
+ */
+public class ChecksumSHA256Impl implements Checksum {
+ public String computeChecksum(InputStream in) throws IOException {
+ return DigestUtils.sha256Hex(in);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 8cc5ed3..a32f08c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -134,7 +134,7 @@ private void createDir(Path path, FsPermission perm) throws IOException {
* otherwise
*/
@VisibleForTesting
- static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+ public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
LoadingCache> statCache) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4d02082..e0d0ada 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1429,13 +1429,31 @@
yarn.sharedcache.manager.admin.thread-count
1
-
+
The address of the web application in the SCM (shared cache manager)
yarn.sharedcache.manager.webapp.address
0.0.0.0:8788
+
+ The algorithm used to compute checksums of files (SHA-256 by default)
+ yarn.sharedcache.checksum.algo.impl
+ org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl
+
+
+
+ The replication factor for the node manager uploader for the shared cache (10 by default)
+ yarn.sharedcache.nm.uploader.replication.factor
+ 10
+
+
+
+ The number of threads used to upload files from a node manager instance (20 by default)
+ yarn.sharedcache.nm.uploader.thread-count
+ 20
+
+
The interval that the yarn client library uses to poll the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 64eb428..ab89e4a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -94,7 +94,8 @@ public int compare(ContainerId c1,
}
public static LocalResource newLocalResource(URL url, LocalResourceType type,
- LocalResourceVisibility visibility, long size, long timestamp) {
+ LocalResourceVisibility visibility, long size, long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(url);
@@ -102,14 +103,16 @@ public static LocalResource newLocalResource(URL url, LocalResourceType type,
resource.setVisibility(visibility);
resource.setSize(size);
resource.setTimestamp(timestamp);
+ resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility, long size,
- long timestamp) {
+ long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
- visibility, size, timestamp);
+ visibility, size, timestamp, shouldBeUploadedToSharedCache);
}
public static ApplicationId newApplicationId(RecordFactory recordFactory,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..098953b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -117,6 +117,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
@@ -224,6 +226,13 @@ public void serviceInit(Configuration conf) throws Exception {
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
+ // add the shared cache upload service (it will do nothing if the shared
+ // cache is disabled)
+ SharedCacheUploadService sharedCacheUploader =
+ createSharedCacheUploaderService();
+ addService(sharedCacheUploader);
+ dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
+
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -358,6 +367,10 @@ protected ResourceLocalizationService createResourceLocalizationService(
deletionContext, dirsHandler, context.getNMStateStore());
}
+ protected SharedCacheUploadService createSharedCacheUploaderService() {
+ return new SharedCacheUploadService();
+ }
+
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index fa54ee1..0f49ffb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -59,6 +60,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -104,6 +107,10 @@
new ArrayList();
private final List appRsrcs =
new ArrayList();
+ private final Map resourcesToBeUploaded =
+ new ConcurrentHashMap();
+ private final Map resourcesUploadPolicies =
+ new ConcurrentHashMap();
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
@@ -513,7 +520,7 @@ private void sendLaunchEvent() {
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
- }
+ }
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
@@ -637,6 +644,8 @@ public ContainerState transition(ContainerImpl container,
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
+ storeSharedCacheUploadPolicies(container, req, rsrc.getValue()
+ .getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
@@ -685,31 +694,76 @@ public ContainerState transition(ContainerImpl container,
}
}
+ // Store the resource's shared cache upload policies
+ // Given LocalResourceRequest can be shared across containers in
+ // LocalResourcesTrackerImpl, we preserve the upload policies here.
+ // In addition, it is possible for the application to create several
+ // "identical" LocalResources as part of
+ // ContainerLaunchContext.setLocalResources with different symlinks.
+ // There is a corner case where these "identical" local resources have
+ // different upload policies. For that scenario, upload policy will be set to
+ // true as long as there is at least one LocalResource entry with
+ // upload policy set to true.
+ private static void storeSharedCacheUploadPolicies(ContainerImpl container,
+ LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+ Boolean storedUploadPolicy =
+ container.resourcesUploadPolicies.get(resourceRequest);
+ if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+ container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+ }
+ }
+
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
static class LocalizedTransition implements
MultipleArcTransition {
+ @SuppressWarnings("unchecked")
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
- List syms =
- container.pendingResources.remove(rsrcEvent.getResource());
+ LocalResourceRequest resourceRequest = rsrcEvent.getResource();
+ Path location = rsrcEvent.getLocation();
+ List syms = container.pendingResources.remove(resourceRequest);
if (null == syms) {
- LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+ LOG.warn("Localized unknown resource " + resourceRequest
+ +
" for container " + container.containerId);
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
- container.localizedResources.put(rsrcEvent.getLocation(), syms);
+ container.localizedResources.put(location, syms);
+
+ // check to see if this resource should be uploaded to the shared cache
+ // as well
+ if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
+ container.resourcesToBeUploaded.put(resourceRequest, location);
+ }
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
container.sendLaunchEvent();
+
+ // If this is a recovered container that has already launched, skip
+ // uploading resources to the shared cache. We do this to avoid uploading
+ // the same resources multiple times. The tradeoff is that in the case of
+ // a recovered container, there is a chance that resources don't get
+ // uploaded into the shared cache. This is OK because resources are not
+ // acknowledged by the SCM until they have been uploaded by the node
+ // manager.
+ if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
+ && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
+ // kick off uploads to the shared cache
+ container.dispatcher.getEventHandler().handle(
+ new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+ .getLaunchContext(), container.getUser(),
+ SharedCacheUploadEventType.UPLOAD));
+ }
+
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -1018,4 +1072,13 @@ public String toString() {
private boolean hasDefaultExitCode() {
return (this.exitCode == ContainerExitStatus.INVALID);
}
+
+ /**
+ * Returns whether the specific resource should be uploaded to the shared
+ * cache.
+ */
+ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
+ LocalResourceRequest resource) {
+ return container.resourcesUploadPolicies.get(resource);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index 70bead7..65a331a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -20,6 +20,8 @@
import java.net.URISyntaxException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -30,6 +32,8 @@
public class LocalResourceRequest
extends LocalResource implements Comparable {
+ private static final Log LOG = LogFactory.getLog(LocalResourceRequest.class);
+
private final Path loc;
private final long timestamp;
private final LocalResourceType type;
@@ -152,6 +156,17 @@ public String getPattern() {
}
@Override
+ public boolean getShouldBeUploadedToSharedCache() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void setResource(URL resource) {
throw new UnsupportedOperationException();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
new file mode 100644
index 0000000..2f27770
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java
@@ -0,0 +1,54 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class SharedCacheUploadEvent extends
+ AbstractEvent {
+ private final Map resources;
+ private final ContainerLaunchContext context;
+ private final String user;
+
+ public SharedCacheUploadEvent(Map resources,
+ ContainerLaunchContext context, String user,
+ SharedCacheUploadEventType eventType) {
+ super(eventType);
+ this.resources = resources;
+ this.context = context;
+ this.user = user;
+ }
+
+ public Map getResources() {
+ return resources;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return context;
+ }
+
+ public String getUser() {
+ return user;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
new file mode 100644
index 0000000..19280ec
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+public enum SharedCacheUploadEventType {
+ UPLOAD
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
new file mode 100644
index 0000000..d32819c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
@@ -0,0 +1,122 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Service that uploads localized files to the shared cache. The upload is
+ * considered not critical, and is done on a best-effort basis. Failure to
+ * upload is not fatal.
+ */
+public class SharedCacheUploadService extends AbstractService implements
+ EventHandler {
+ private static final Log LOG =
+ LogFactory.getLog(SharedCacheUploadService.class);
+
+ private boolean enabled;
+ private FileSystem fs;
+ private FileSystem localFs;
+ private ExecutorService uploaderPool;
+ private NMCacheUploaderSCMProtocol scmClient;
+
+ public SharedCacheUploadService() {
+ super(SharedCacheUploadService.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED);
+ if (enabled) {
+ int threadCount =
+ conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
+ uploaderPool = Executors.newFixedThreadPool(threadCount,
+ new ThreadFactoryBuilder().
+ setNameFormat("Shared cache uploader #%d").
+ build());
+ scmClient = createSCMClient(conf);
+ try {
+ fs = FileSystem.get(conf);
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ LOG.error("Unexpected exception in getting the filesystem", e);
+ throw new RuntimeException(e);
+ }
+ }
+ super.serviceInit(conf);
+ }
+
+ private NMCacheUploaderSCMProtocol createSCMClient(Configuration conf) {
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress scmAddress =
+ conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_SCM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_SCM_PORT);
+ return (NMCacheUploaderSCMProtocol)rpc.getProxy(
+ NMCacheUploaderSCMProtocol.class, scmAddress, conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (enabled) {
+ uploaderPool.shutdown();
+ RPC.stopProxy(scmClient);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(SharedCacheUploadEvent event) {
+ if (enabled) {
+ Map resources = event.getResources();
+ for (Map.Entry e: resources.entrySet()) {
+ Uploader uploader =
+ new Uploader(e.getKey(), e.getValue(), event.getUser(),
+ getConfig(), scmClient, fs, localFs);
+ // fire off an upload task
+ uploaderPool.submit(uploader);
+ }
+ }
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java
new file mode 100644
index 0000000..a3c1578
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/Uploader.java
@@ -0,0 +1,290 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URISyntaxException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest;
+import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil;
+import org.apache.hadoop.yarn.sharedcache.Checksum;
+import org.apache.hadoop.yarn.sharedcache.ChecksumFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The callable class that handles the actual upload.
+ */
+class Uploader implements Callable {
+ // rwxr-xr-x
+ static final FsPermission DIRECTORY_PERMISSION =
+ new FsPermission((short)00755);
+ // r-xr-xr-x
+ static final FsPermission FILE_PERMISSION =
+ new FsPermission((short)00555);
+
+ private static final Log LOG = LogFactory.getLog(Uploader.class);
+ private static final ThreadLocal randomTl = new ThreadLocal() {
+ @Override
+ protected Random initialValue() {
+ return new Random(System.nanoTime());
+ }
+ };
+
+ private final LocalResource resource;
+ private final Path localPath;
+ private final String user;
+ private final Configuration conf;
+ private final NMCacheUploaderSCMProtocol scmClient;
+ private final FileSystem fs;
+ private final FileSystem localFs;
+ private final String sharedCacheRootDir;
+ private final int nestedLevel;
+ private final Checksum checksum;
+ private final RecordFactory recordFactory;
+
+ public Uploader(LocalResource resource, Path localPath, String user,
+ Configuration conf, NMCacheUploaderSCMProtocol scmClient)
+ throws IOException {
+ this(resource, localPath, user, conf, scmClient,
+ FileSystem.get(conf), localPath.getFileSystem(conf));
+ }
+
+ /**
+ * @param resource the local resource that contains the original remote path
+ * @param localPath the path in the local filesystem where the resource is
+ * localized
+ * @param fs the filesystem of the shared cache
+ * @param localFs the local filesystem
+ */
+ public Uploader(LocalResource resource, Path localPath, String user,
+ Configuration conf, NMCacheUploaderSCMProtocol scmClient, FileSystem fs,
+ FileSystem localFs) {
+ this.resource = resource;
+ this.localPath = localPath;
+ this.user = user;
+ this.conf = conf;
+ this.scmClient = scmClient;
+ this.fs = fs;
+ this.sharedCacheRootDir =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ this.nestedLevel = CacheStructureUtil.getCacheDepth(conf);
+ this.checksum = ChecksumFactory.getChecksum(conf);
+ this.localFs = localFs;
+ this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ }
+
+ /**
+ * Uploads the file under the shared cache, and notifies the shared cache
+ * manager. If it is unable to upload the file because it already exists, it
+ * returns false.
+ */
+ @Override
+ public Boolean call() throws Exception {
+ Path tempPath = null;
+ try {
+ if (!verifyAccess()) {
+ LOG.warn("User " + user + " is not authorized to upload file " +
+ localPath.getName());
+ return false;
+ }
+
+ // first determine the actual local path that will be used for upload
+ Path actualPath = getActualPath();
+ // compute the checksum
+ String checksumVal = computeChecksum(actualPath);
+ // create the directory (if it doesn't exist)
+ Path directoryPath =
+ new Path(CacheStructureUtil.getCacheEntryPath(nestedLevel,
+ sharedCacheRootDir, checksumVal));
+ // let's not check if the directory already exists: in the vast majority
+ // of the cases, the directory does not exist; as long as mkdirs does not
+ // error out if it exists, we should be fine
+ fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
+ // create the temporary file
+ tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
+ if (!uploadFile(actualPath, tempPath)) {
+ LOG.warn("Could not copy the file to the shared cache at " + tempPath);
+ return false;
+ }
+
+ // set the permission so that it is readable but not writable
+ // TODO should I create the file with the right permission so I save the
+ // permission call?
+ fs.setPermission(tempPath, FILE_PERMISSION);
+ // rename it to the final filename
+ Path finalPath = new Path(directoryPath, actualPath.getName());
+ if (!fs.rename(tempPath, finalPath)) {
+ LOG.warn("The file already exists under " + finalPath +
+ ". Ignoring this attempt.");
+ deleteTempFile(tempPath);
+ return false;
+ }
+
+ // notify the SCM
+ if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
+ // the shared cache manager rejected the upload (as it is likely
+ // uploaded under a different name
+ // clean up this file and exit
+ fs.delete(finalPath, false);
+ return false;
+ }
+
+ // set the replication factor
+ short replication =
+ (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
+ fs.setReplication(finalPath, replication);
+ LOG.info("File " + actualPath.getName() +
+ " was uploaded to the shared cache at " + finalPath);
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Exception while uploading the file " + localPath.getName(), e);
+ // in case an exception is thrown, delete the temp file
+ deleteTempFile(tempPath);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ Path getActualPath() throws IOException {
+ Path path = localPath;
+ FileStatus status = localFs.getFileStatus(path);
+ if (status != null && status.isDirectory()) {
+ // for certain types of resources that get unpacked, the original file may
+ // be found under the directory with the same name (see
+ // FSDownload.unpack); check if the path is a directory and if so look
+ // under it
+ path = new Path(path, path.getName());
+ }
+ return path;
+ }
+
+ private void deleteTempFile(Path tempPath) {
+ try {
+ if (tempPath != null && fs.exists(tempPath)) {
+ fs.delete(tempPath, false);
+ }
+ } catch (IOException ignore) {}
+ }
+
+ /**
+ * Checks that the (original) remote file is either owned by the user who
+ * started the app or public.
+ */
+ @VisibleForTesting
+ boolean verifyAccess() throws IOException {
+ // if it is in the public cache, it's trivially OK
+ if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
+ return true;
+ }
+
+ final Path remotePath;
+ try {
+ remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid resource", e);
+ }
+
+ // get the file status of the HDFS file
+ FileSystem remoteFs = remotePath.getFileSystem(conf);
+ FileStatus status = remoteFs.getFileStatus(remotePath);
+ // check to see if the file has been modified in any way
+ if (status.getModificationTime() != resource.getTimestamp()) {
+ LOG.warn("The remote file " + remotePath +
+ " has changed since it's localized; will not consider it for upload");
+ return false;
+ }
+
+ // check for the user ownership
+ if (status.getOwner().equals(user)) {
+ return true; // the user owns the file
+ }
+ // check if the file is publicly readable otherwise
+ return fileIsPublic(remotePath, remoteFs, status);
+ }
+
+ @VisibleForTesting
+ boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
+ FileStatus status) throws IOException {
+ return FSDownload.isPublic(remoteFs, remotePath, status, null);
+ }
+
+ /**
+ * Uploads the file to the shared cache under a temporary name, and returns
+ * the result.
+ */
+ @VisibleForTesting
+ boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
+ return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
+ }
+
+ @VisibleForTesting
+ String computeChecksum(Path path) throws IOException {
+ InputStream is = localFs.open(path);
+ try {
+ return checksum.computeChecksum(is);
+ } finally {
+ try { is.close(); } catch (IOException ignore) {}
+ }
+ }
+
+ private String getTemporaryFileName(Path path) {
+ return path.getName() + "-" + randomTl.get().nextLong();
+ }
+
+ @VisibleForTesting
+ boolean notifySharedCacheManager(String checksumVal, String fileName)
+ throws IOException {
+ try {
+ NotifySCMRequest request =
+ recordFactory.newRecordInstance(NotifySCMRequest.class);
+ request.setResourceKey(checksumVal);
+ request.setFilename(fileName);
+ return scmClient.notify(request).getAccepted();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ } catch (UndeclaredThrowableException e) {
+ // retrieve the cause of the exception and throw it as an IOException
+ throw new IOException(e.getCause() == null ? e : e.getCause());
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index a813e98..20e3569 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -642,7 +642,7 @@ public boolean matches(Object o) {
URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
- r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return new SimpleEntry(name, rsrc);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index ed59ddd..c7ae2cc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -1645,7 +1645,7 @@ private static LocalResource getMockedResource(Random r,
URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
- r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return rsrc;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
new file mode 100644
index 0000000..1b2b2f0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestSharedCacheUploadService {
+
+ @Test
+ public void testInitDisabled() {
+ testInit(false);
+ }
+
+ @Test
+ public void testInitEnabled() {
+ testInit(true);
+ }
+
+ public void testInit(boolean enabled) {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled);
+
+ SharedCacheUploadService service = new SharedCacheUploadService();
+ service.init(conf);
+ assertSame(enabled, service.isEnabled());
+
+ service.stop();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java
new file mode 100644
index 0000000..e9eea0e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestUploader.java
@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse;
+import org.junit.Test;
+
+public class TestUploader {
+
+ /**
+ * If verifyAccess fails, the upload should fail
+ */
+ @Test
+ public void testFailVerifyAccess() throws Exception {
+ Uploader spied = createSpiedUploader();
+ doReturn(false).when(spied).verifyAccess();
+
+ assertFalse(spied.call());
+ }
+
+ /**
+ * If rename fails, the upload should fail
+ */
+ @Test
+ public void testRenameFail() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ NMCacheUploaderSCMProtocol scmClient =
+ mock(NMCacheUploaderSCMProtocol.class);
+ NotifySCMResponse response = mock(NotifySCMResponse.class);
+ when(response.getAccepted()).thenReturn(true);
+ when(scmClient.notify(isA(NotifySCMRequest.class))).thenReturn(response);
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Uploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+
+ assertFalse(spied.call());
+ }
+
+ /**
+ * If verifyAccess, uploadFile, rename, and notification succeed, the upload
+ * should succeed
+ */
+ @Test
+ public void testSuccess() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ NMCacheUploaderSCMProtocol scmClient =
+ mock(NMCacheUploaderSCMProtocol.class);
+ NotifySCMResponse response = mock(NotifySCMResponse.class);
+ when(response.getAccepted()).thenReturn(true);
+ when(scmClient.notify(isA(NotifySCMRequest.class))).thenReturn(response);
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Uploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+ // stub notifySharedCacheManager to return true
+ doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
+ isA(String.class));
+
+ assertTrue(spied.call());
+ }
+
+ /**
+ * If verifyAccess, uploadFile, and rename succed, but it receives a nay from
+ * SCM, the file should be deleted
+ */
+ @Test
+ public void testNotifySCMFail() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ FileSystem fs = mock(FileSystem.class);
+ // return false when rename is called
+ when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Uploader spied =
+ createSpiedUploader(resource, localPath, user, conf, null, fs,
+ localFs);
+ // stub verifyAccess() to return true
+ doReturn(true).when(spied).verifyAccess();
+ // stub getActualPath()
+ doReturn(localPath).when(spied).getActualPath();
+ // stub computeChecksum()
+ doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
+ // stub uploadFile() to return true
+ doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
+ // stub notifySharedCacheManager to return true
+ doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
+ isA(String.class));
+
+ assertFalse(spied.call());
+ verify(fs).delete(isA(Path.class), anyBoolean());
+ }
+
+ /**
+ * If resource is public, verifyAccess should succeed
+ */
+ @Test
+ public void testVerifyAccessPublicResource() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ // give public visibility
+ when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+ Path localPath = mock(Path.class);
+ when(localPath.getName()).thenReturn("foo.jar");
+ String user = "joe";
+ NMCacheUploaderSCMProtocol scmClient =
+ mock(NMCacheUploaderSCMProtocol.class);
+ FileSystem fs = mock(FileSystem.class);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Uploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+
+ assertTrue(spied.verifyAccess());
+ }
+
+ /**
+ * If the localPath does not exists, getActualPath should get to one level
+ * down
+ */
+ @Test
+ public void testGetActualPath() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ // give public visibility
+ when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
+ Path localPath = new Path("foo.jar");
+ String user = "joe";
+ NMCacheUploaderSCMProtocol scmClient =
+ mock(NMCacheUploaderSCMProtocol.class);
+ FileSystem fs = mock(FileSystem.class);
+ FileSystem localFs = mock(FileSystem.class);
+ // stub it to return a status that indicates a directory
+ FileStatus status = mock(FileStatus.class);
+ when(status.isDirectory()).thenReturn(true);
+ when(localFs.getFileStatus(localPath)).thenReturn(status);
+ Uploader spied =
+ createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+
+ Path actualPath = spied.getActualPath();
+ assertEquals(actualPath.getName(), localPath.getName());
+ assertEquals(actualPath.getParent().getName(), localPath.getName());
+ }
+
+ private Uploader createSpiedUploader() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
+ LocalResource resource = mock(LocalResource.class);
+ Path localPath = mock(Path.class);
+ String user = "foo";
+ NMCacheUploaderSCMProtocol scmClient =
+ mock(NMCacheUploaderSCMProtocol.class);
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
+ localFs);
+ }
+
+ private Uploader createSpiedUploader(LocalResource resource, Path localPath,
+ String user, Configuration conf, NMCacheUploaderSCMProtocol scmClient,
+ FileSystem fs, FileSystem localFs)
+ throws IOException {
+ Uploader uploader = new Uploader(resource, localPath, user, conf, scmClient,
+ fs, localFs);
+ return spy(uploader);
+ }
+}