diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index f14a136..2504e29 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -55,6 +55,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
resource.setSize(size);
resource.setTimestamp(timestamp);
resource.setPattern(pattern);
+ resource.setShouldBeUploadedToSharedCache(false);
+ return resource;
+ }
+
+ @Public
+ @Stable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ String pattern, boolean shouldBeUploadedToSharedCache) {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ resource.setResource(url);
+ resource.setType(type);
+ resource.setVisibility(visibility);
+ resource.setSize(size);
+ resource.setTimestamp(timestamp);
+ resource.setPattern(pattern);
+ resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
@@ -62,7 +79,16 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Stable
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp) {
- return newInstance(url, type, visibility, size, timestamp, null);
+ return newInstance(url, type, visibility, size, timestamp, false);
+ }
+
+ @Public
+ @Stable
+ public static LocalResource newInstance(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp,
+ boolean shouldBeUploadedToSharedCache) {
+ return newInstance(url, type, visibility, size, timestamp,
+ shouldBeUploadedToSharedCache);
}
/**
@@ -170,4 +196,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Public
@Stable
public abstract void setPattern(String pattern);
+
+ /**
+ * NM uses it to decide whether if it is necessary to upload the resource
+ * shared cache
+ */
+ @Public
+ @Stable
+ public abstract boolean getShouldBeUploadedToSharedCache();
+
+ /**
+ * Inform NM whether upload to SCM is needed.
+ *
+ * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache
+ * of this request
+ */
+ @Public
+ @Stable
+ public abstract void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index dced082..4544ed2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1304,6 +1304,12 @@
public static final String DEFAULT_SCM_WEBAPP_ADDRESS = "0.0.0.0:"
+ DEFAULT_SCM_WEBAPP_PORT;
+ /** the checksum algorithm implementation **/
+ public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ SHARED_CACHE_PREFIX + "checksum.algo.impl";
+ public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
+ "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 48aac9d..9ffb2df 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -159,6 +159,7 @@ message LocalResourceProto {
optional LocalResourceTypeProto type = 4;
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
+ optional bool should_be_uploaded_to_shared_cache = 7;
}
message ApplicationResourceUsageReportProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 16bd597..560b081 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -192,6 +192,26 @@ public synchronized void setPattern(String pattern) {
builder.setPattern(pattern);
}
+ @Override
+ public synchronized boolean getShouldBeUploadedToSharedCache() {
+ LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasShouldBeUploadedToSharedCache()) {
+ return false;
+ }
+ return p.getShouldBeUploadedToSharedCache();
+ }
+
+ @Override
+ public synchronized void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ maybeInitBuilder();
+ if (!shouldBeUploadedToSharedCache) {
+ builder.clearShouldBeUploadedToSharedCache();
+ return;
+ }
+ builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
+ }
+
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java
new file mode 100644
index 0000000..2807d95
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface to calculate a checksum. The checksum implementation should be
+ * thread safe.
+ */
+public interface Checksum {
+
+ /**
+ * Calculate the checksum of the passed input stream.
+ *
+ * @param in InputStream to be checksumed
+ * @return the message digest of the input stream
+ * @throws IOException
+ */
+ public String computeChecksum(InputStream in) throws IOException;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java
new file mode 100644
index 0000000..d904911
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * A factory class for creating checksum objects based on a configurable
+ * algorithm implementation
+ */
+public class ChecksumFactory {
+ private static final ConcurrentMap instances =
+ new ConcurrentHashMap();
+
+ /**
+ * Get a new Checksum object based on the configurable algorithm
+ * implementation (see yarn.sharedcache.checksum.algo.impl)
+ *
+ * @return Checksum object
+ */
+ public static Checksum getChecksum(Configuration conf) {
+ String className =
+ conf.get(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+ Checksum checksum = instances.get(className);
+ if (checksum == null) {
+ try {
+ Class> clazz = Class.forName(className);
+ checksum = (Checksum) clazz.newInstance();
+ Checksum old = instances.putIfAbsent(className, checksum);
+ if (old != null) {
+ checksum = old;
+ }
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ return checksum;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
new file mode 100644
index 0000000..f25b717
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+/**
+ * The SHA-256 implementation of the shared cache checksum interface.
+ */
+public class ChecksumSHA256Impl implements Checksum {
+ public String computeChecksum(InputStream in) throws IOException {
+ return DigestUtils.sha256Hex(in);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java
new file mode 100644
index 0000000..459b689
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sharedcache;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ClientSCMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This is the client for YARN's shared cache.
+ */
+public class SharedCacheClient extends AbstractService {
+ private static final Log LOG = LogFactory
+ .getLog(SharedCacheClient.class);
+
+ private ClientSCMProtocol scmClient;
+ private InetSocketAddress scmAddress;
+ private Configuration conf;
+ private Checksum checksumAlg;
+
+ // If scm isn't available, we will mark this instance
+ // of SharedCacheClient unusable. This is useful when
+ // the caller of SharedCacheClient needs to call the same
+ // instance of SharedCacheClient multiple times; it allows
+ // the caller to quickly fall back to the non-SCM approach.
+ private volatile boolean scmAvailable = false;
+
+ private static final String ROOT = "root";
+
+ public SharedCacheClient() {
+ this(null);
+ }
+
+ public SharedCacheClient(InetSocketAddress scmAddress) {
+ super(SharedCacheClient.class.getName());
+ this.scmAddress = scmAddress;
+ }
+
+ private static InetSocketAddress getScmAddress(Configuration conf) {
+ return conf.getSocketAddr(YarnConfiguration.SCM_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_ADDRESS,
+ YarnConfiguration.DEFAULT_SCM_PORT);
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf) throws Exception {
+ if (this.scmAddress == null) {
+ this.scmAddress = getScmAddress(conf);
+ }
+ this.conf = conf;
+ this.checksumAlg = ChecksumFactory.getChecksum(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+
+ this.scmClient = (ClientSCMProtocol) rpc.getProxy(
+ ClientSCMProtocol.class, this.scmAddress, getConfig());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to Shared Cache Manager at " + this.scmAddress);
+ }
+ scmAvailable = true;
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ if (this.scmClient != null) {
+ RPC.stopProxy(this.scmClient);
+ }
+ super.serviceStop();
+ }
+
+ public boolean isScmAvailable() {
+ return this.scmAvailable;
+ }
+
+ public Path use(ApplicationId applicationId, Path sourceFile)
+ throws IOException {
+
+ // If for whatever reason, we can't even calculate checksum for
+ // local resource, something is really wrong with the file system;
+ // even non-SCM approach won't work. Let us just throw the exception.
+ String checksum = getFileChecksum(sourceFile);
+ return use(applicationId, checksum);
+ }
+
+ public Path use(ApplicationId applicationId, String resourceKey) {
+
+ Path resourcePath = null;
+ UseSharedCacheResourceRequest request = Records.newRecord(
+ UseSharedCacheResourceRequest.class);
+ request.setAppId(applicationId);
+ request.setResourceKey(resourceKey);
+ try {
+ UseSharedCacheResourceResponse response = this.scmClient.use(request);
+ if (response != null && response.getPath() != null) {
+ resourcePath = new Path(response.getPath());
+ }
+ } catch (Exception e) {
+ // Just catching IOException isn't enough.
+ // RPC call can throw ConnectionException.
+ // We don't handle different exceptions separately at this point.
+ LOG.warn("SCM might be down. The exception is " + e.getMessage());
+ e.printStackTrace();
+ scmAvailable = false;
+ }
+ return resourcePath;
+ }
+
+ public void release(ApplicationId applicationId, String resourceKey) {
+ if (!scmAvailable) {
+ return;
+ }
+ ReleaseSharedCacheResourceRequest request = Records.newRecord(
+ ReleaseSharedCacheResourceRequest.class);
+ request.setAppId(applicationId);
+ request.setResourceKey(resourceKey);
+ try {
+ ReleaseSharedCacheResourceResponse response = this.scmClient.release(request);
+ } catch (Exception e) {
+ // Just catching IOException isn't enough.
+ // RPC call can throw ConnectionException.
+ LOG.warn("SCM might be down. The exception is " + e.getMessage());
+ e.printStackTrace();
+ scmAvailable = false;
+ }
+ }
+
+
+ /**
+ * Calculates the SHA-256 checksum for a given file and verifies the file
+ * length.
+ *
+ * @return A hex string containing the SHA-256 digest
+ * @throws IOException
+ */
+ public String getFileChecksum(Path sourceFile)
+ throws IOException {
+ FileSystem fs = sourceFile.getFileSystem(this.conf);
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(sourceFile);
+ return this.checksumAlg.computeChecksum(in);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index 70bead7..65a331a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -20,6 +20,8 @@
import java.net.URISyntaxException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -30,6 +32,8 @@
public class LocalResourceRequest
extends LocalResource implements Comparable {
+ private static final Log LOG = LogFactory.getLog(LocalResourceRequest.class);
+
private final Path loc;
private final long timestamp;
private final LocalResourceType type;
@@ -152,6 +156,17 @@ public String getPattern() {
}
@Override
+ public boolean getShouldBeUploadedToSharedCache() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setShouldBeUploadedToSharedCache(
+ boolean shouldBeUploadedToSharedCache) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void setResource(URL resource) {
throw new UnsupportedOperationException();
}