diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java index f14a136..2504e29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java @@ -55,6 +55,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type, resource.setSize(size); resource.setTimestamp(timestamp); resource.setPattern(pattern); + resource.setShouldBeUploadedToSharedCache(false); + return resource; + } + + @Public + @Stable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + String pattern, boolean shouldBeUploadedToSharedCache) { + LocalResource resource = Records.newRecord(LocalResource.class); + resource.setResource(url); + resource.setType(type); + resource.setVisibility(visibility); + resource.setSize(size); + resource.setTimestamp(timestamp); + resource.setPattern(pattern); + resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; } @@ -62,7 +79,16 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Stable public static LocalResource newInstance(URL url, LocalResourceType type, LocalResourceVisibility visibility, long size, long timestamp) { - return newInstance(url, type, visibility, size, timestamp, null); + return newInstance(url, type, visibility, size, timestamp, false); + } + + @Public + @Stable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + boolean shouldBeUploadedToSharedCache) { + return newInstance(url, type, visibility, size, timestamp, + shouldBeUploadedToSharedCache); } /** @@ -170,4 +196,23 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Public @Stable public abstract void setPattern(String pattern); + + /** + * NM uses it to decide whether if it is necessary to upload the resource + * shared cache + */ + @Public + @Stable + public abstract boolean getShouldBeUploadedToSharedCache(); + + /** + * Inform NM whether upload to SCM is needed. + * + * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache + * of this request + */ + @Public + @Stable + public abstract void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dced082..4544ed2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1304,6 +1304,12 @@ public static final String DEFAULT_SCM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; + /** the checksum algorithm implementation **/ + public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL = + SHARED_CACHE_PREFIX + "checksum.algo.impl"; + public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL = + "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 48aac9d..9ffb2df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -159,6 +159,7 @@ message LocalResourceProto { optional LocalResourceTypeProto type = 4; optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; + optional bool should_be_uploaded_to_shared_cache = 7; } message ApplicationResourceUsageReportProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java index 16bd597..560b081 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java @@ -192,6 +192,26 @@ public synchronized void setPattern(String pattern) { builder.setPattern(pattern); } + @Override + public synchronized boolean getShouldBeUploadedToSharedCache() { + LocalResourceProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasShouldBeUploadedToSharedCache()) { + return false; + } + return p.getShouldBeUploadedToSharedCache(); + } + + @Override + public synchronized void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + maybeInitBuilder(); + if (!shouldBeUploadedToSharedCache) { + builder.clearShouldBeUploadedToSharedCache(); + return; + } + builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); + } + private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) { return ProtoUtils.convertToProtoFormat(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java new file mode 100644 index 0000000..2807d95 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/Checksum.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An interface to calculate a checksum. The checksum implementation should be + * thread safe. + */ +public interface Checksum { + + /** + * Calculate the checksum of the passed input stream. + * + * @param in InputStream to be checksumed + * @return the message digest of the input stream + * @throws IOException + */ + public String computeChecksum(InputStream in) throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java new file mode 100644 index 0000000..d904911 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A factory class for creating checksum objects based on a configurable + * algorithm implementation + */ +public class ChecksumFactory { + private static final ConcurrentMap instances = + new ConcurrentHashMap(); + + /** + * Get a new Checksum object based on the configurable algorithm + * implementation (see yarn.sharedcache.checksum.algo.impl) + * + * @return Checksum object + */ + public static Checksum getChecksum(Configuration conf) { + String className = + conf.get(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL, + YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); + Checksum checksum = instances.get(className); + if (checksum == null) { + try { + Class clazz = Class.forName(className); + checksum = (Checksum) clazz.newInstance(); + Checksum old = instances.putIfAbsent(className, checksum); + if (old != null) { + checksum = old; + } + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + return checksum; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java new file mode 100644 index 0000000..f25b717 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.codec.digest.DigestUtils; + +/** + * The SHA-256 implementation of the shared cache checksum interface. + */ +public class ChecksumSHA256Impl implements Checksum { + public String computeChecksum(InputStream in) throws IOException { + return DigestUtils.sha256Hex(in); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java new file mode 100644 index 0000000..459b689 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheClient.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; + +/** + * This is the client for YARN's shared cache. + */ +public class SharedCacheClient extends AbstractService { + private static final Log LOG = LogFactory + .getLog(SharedCacheClient.class); + + private ClientSCMProtocol scmClient; + private InetSocketAddress scmAddress; + private Configuration conf; + private Checksum checksumAlg; + + // If scm isn't available, we will mark this instance + // of SharedCacheClient unusable. This is useful when + // the caller of SharedCacheClient needs to call the same + // instance of SharedCacheClient multiple times; it allows + // the caller to quickly fall back to the non-SCM approach. + private volatile boolean scmAvailable = false; + + private static final String ROOT = "root"; + + public SharedCacheClient() { + this(null); + } + + public SharedCacheClient(InetSocketAddress scmAddress) { + super(SharedCacheClient.class.getName()); + this.scmAddress = scmAddress; + } + + private static InetSocketAddress getScmAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_PORT); + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + if (this.scmAddress == null) { + this.scmAddress = getScmAddress(conf); + } + this.conf = conf; + this.checksumAlg = ChecksumFactory.getChecksum(conf); + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + YarnRPC rpc = YarnRPC.create(getConfig()); + + this.scmClient = (ClientSCMProtocol) rpc.getProxy( + ClientSCMProtocol.class, this.scmAddress, getConfig()); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to Shared Cache Manager at " + this.scmAddress); + } + scmAvailable = true; + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (this.scmClient != null) { + RPC.stopProxy(this.scmClient); + } + super.serviceStop(); + } + + public boolean isScmAvailable() { + return this.scmAvailable; + } + + public Path use(ApplicationId applicationId, Path sourceFile) + throws IOException { + + // If for whatever reason, we can't even calculate checksum for + // local resource, something is really wrong with the file system; + // even non-SCM approach won't work. Let us just throw the exception. + String checksum = getFileChecksum(sourceFile); + return use(applicationId, checksum); + } + + public Path use(ApplicationId applicationId, String resourceKey) { + + Path resourcePath = null; + UseSharedCacheResourceRequest request = Records.newRecord( + UseSharedCacheResourceRequest.class); + request.setAppId(applicationId); + request.setResourceKey(resourceKey); + try { + UseSharedCacheResourceResponse response = this.scmClient.use(request); + if (response != null && response.getPath() != null) { + resourcePath = new Path(response.getPath()); + } + } catch (Exception e) { + // Just catching IOException isn't enough. + // RPC call can throw ConnectionException. + // We don't handle different exceptions separately at this point. + LOG.warn("SCM might be down. The exception is " + e.getMessage()); + e.printStackTrace(); + scmAvailable = false; + } + return resourcePath; + } + + public void release(ApplicationId applicationId, String resourceKey) { + if (!scmAvailable) { + return; + } + ReleaseSharedCacheResourceRequest request = Records.newRecord( + ReleaseSharedCacheResourceRequest.class); + request.setAppId(applicationId); + request.setResourceKey(resourceKey); + try { + ReleaseSharedCacheResourceResponse response = this.scmClient.release(request); + } catch (Exception e) { + // Just catching IOException isn't enough. + // RPC call can throw ConnectionException. + LOG.warn("SCM might be down. The exception is " + e.getMessage()); + e.printStackTrace(); + scmAvailable = false; + } + } + + + /** + * Calculates the SHA-256 checksum for a given file and verifies the file + * length. + * + * @return A hex string containing the SHA-256 digest + * @throws IOException + */ + public String getFileChecksum(Path sourceFile) + throws IOException { + FileSystem fs = sourceFile.getFileSystem(this.conf); + FSDataInputStream in = null; + try { + in = fs.open(sourceFile); + return this.checksumAlg.computeChecksum(in); + } finally { + if (in != null) { + in.close(); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java index 70bead7..65a331a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java @@ -20,6 +20,8 @@ import java.net.URISyntaxException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -30,6 +32,8 @@ public class LocalResourceRequest extends LocalResource implements Comparable { + private static final Log LOG = LogFactory.getLog(LocalResourceRequest.class); + private final Path loc; private final long timestamp; private final LocalResourceType type; @@ -152,6 +156,17 @@ public String getPattern() { } @Override + public boolean getShouldBeUploadedToSharedCache() { + throw new UnsupportedOperationException(); + } + + @Override + public void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + throw new UnsupportedOperationException(); + } + + @Override public void setResource(URL resource) { throw new UnsupportedOperationException(); }