diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
index e243e9051fb..c556419157c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java
@@ -211,4 +211,24 @@ public static LocalResource newInstance(URL url, LocalResourceType type,
@Unstable
public abstract void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache);
+
+ /**
+ * Get the image name.
+ *
+ * @return image name when the type of resource is
+ * {@link LocalResourceType#DOCKER_IMAGE}; null otherwise.
+ */
+ @Public
+ @Unstable
+ public abstract String getImageName();
+
+ /**
+ * Sets the docker image name. This should be set when the resource type is
+ * {@link LocalResourceType#DOCKER_IMAGE}.
+ *
+ * @param imageName docker image name.
+ */
+ @Public
+ @Unstable
+ public abstract void setImageName(String imageName);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java
index 1552cdf8cc3..b7417e9ef1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java
@@ -38,6 +38,9 @@
*
* {@link #PATTERN} - A hybrid between {@link #ARCHIVE} and {@link #FILE}.
*
+ *
+ * {@link #DOCKER_IMAGE} - Docker image.
+ *
*
*
* @see LocalResource
@@ -66,5 +69,10 @@
* in #{@link LocalResource}. Currently only jars support pattern, all
* others will be treated like a #{@link LocalResourceType#ARCHIVE}.
*/
- PATTERN
+ PATTERN,
+
+ /**
+ * Docker Image.
+ */
+ DOCKER_IMAGE
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4573859d384..17c99ef6ee1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -235,6 +235,7 @@ message LocalResourceProto {
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
optional bool should_be_uploaded_to_shared_cache = 7;
+ optional string image_name = 8;
}
message StringLongMapProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
index 84c3b6e020d..a1fbdd54445 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java
@@ -135,6 +135,15 @@ public void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache) {
}
+
+ @Override
+ public String getImageName() {
+ return null;
+ }
+
+ @Override
+ public void setImageName(String imageName) {
+ }
});
// Initialize list of files.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
index 560b081c016..69898db0939 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java
@@ -212,6 +212,25 @@ public synchronized void setShouldBeUploadedToSharedCache(
builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
}
+ @Override
+ public synchronized String getImageName() {
+ LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasImageName()) {
+ return null;
+ }
+ return p.getImageName();
+ }
+
+ @Override
+ public void setImageName(String imageName) {
+ maybeInitBuilder();
+ if (imageName == null) {
+ builder.clearImageName();
+ return;
+ }
+ builder.setImageName(imageName);
+ }
+
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java
new file mode 100644
index 00000000000..91900760a3a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerImagesCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerPullCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Docker image localizer.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ImageLocalizer extends CompositeService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ImageLocalizer.class);
+
+ private ExecutorService imagePuller =
+ HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("ImagePuller #%d").build());
+ private ScheduledExecutorService statusChecker = HadoopExecutors.
+ newScheduledThreadPool(10);
+
+ private final Multimap imageToLocalizers =
+ ArrayListMultimap.create();
+
+ private final Map downloading = new HashMap<>();
+ private final Map readOnlyDownloading;
+ private final Set localImages = new HashSet<>();
+ private final Map downloaded = new HashMap<>();
+ private final Map failed = new HashMap<>();
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+
+ private final Configuration conf;
+ private final Context nmContext;
+ private final ResourceLocalizationService rsrcLclzationService;
+ private final RecordFactory recordFactory;
+
+ public ImageLocalizer(Configuration conf,
+ ResourceLocalizationService rsrcLclzationService,
+ Context nmContext) {
+ super(ImageLocalizer.class.getName());
+ this.conf = Preconditions.checkNotNull(conf, "configuration");
+ this.nmContext = Preconditions.checkNotNull(nmContext, "nm context");
+ this.rsrcLclzationService = Preconditions.checkNotNull(rsrcLclzationService,
+ "resource localization service");
+ recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+ readOnlyDownloading = Collections.unmodifiableMap(downloading);
+
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ findLocalImages();
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ statusChecker.scheduleAtFixedRate(new ImageStatusRetriever(this),
+ STATUS_CHECKER_TIME_INTERVAL, STATUS_CHECKER_TIME_INTERVAL,
+ TimeUnit.SECONDS);
+ }
+
+ private void findLocalImages() {
+ try {
+ PrivilegedOperationExecutor privOpExecutor =
+ PrivilegedOperationExecutor.getInstance(conf);
+ DockerImagesCommand imagesCommand = new DockerImagesCommand();
+ String output = DockerCommandExecutor.executeDockerCommand(
+ imagesCommand, Long.toHexString(System.currentTimeMillis()),
+ null, privOpExecutor, false, nmContext);
+ if (!Strings.isNullOrEmpty(output)) {
+ String[] lines = output.split(System.getProperty("line.separator"));
+ localImages.addAll(Arrays.asList(lines));
+ }
+ } catch (ContainerExecutionException e) {
+ LOG.warn("Unable to find existing images", e);
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ statusChecker.shutdown();
+ imagePuller.shutdown();
+ super.serviceStop();
+ }
+
+ public void localize(String localizerId, LocalResource localResource) {
+ try {
+ writeLock.lock();
+ if (isPresent(localResource)) {
+ return;
+ }
+ imageToLocalizers.put(localResource.getImageName(), localizerId);
+
+ if (!downloading.containsKey(localResource.getImageName())) {
+ downloading.put(localResource.getImageName(), localResource);
+ imagePuller.submit(new ImageLocalizationTask(this, localResource));
+ } else {
+ LOG.info("{} is already downloading", localResource.getImageName());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private boolean isPresent(LocalResource localResource) {
+ if (localImages.contains(localResource.getImageName())) {
+ LOG.info("{} is already present", localResource.getImageName());
+ return true;
+ }
+ if (downloaded.containsKey(localResource.getImageName())) {
+ LOG.info("{} already downloaded", localResource.getImageName());
+ return true;
+ }
+ return false;
+ }
+
+ private void completedDownload(List completedResources) {
+ try {
+ writeLock.lock();
+ completedResources.forEach(completed -> {
+ downloading.remove(completed.getImageName());
+ downloaded.put(completed.getImageName(), completed);
+ });
+ postCompleted(completedResources);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void failedDownload(LocalResource localResource,
+ SerializedException ex) {
+ try {
+ writeLock.lock();
+ downloading.remove(localResource.getImageName());
+ failed.put(localResource.getImageName(), localResource);
+ postFailed(localResource, ex);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void postCompleted(List resources) {
+ final Multimap statusesPerLocalizer =
+ ArrayListMultimap.create();
+
+ resources.forEach(resource -> {
+ LocalResourceStatus stat =
+ recordFactory.newRecordInstance(LocalResourceStatus.class);
+ stat.setResource(resource);
+ stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
+ Collection localizers = imageToLocalizers.removeAll(
+ resource.getImageName());
+ if (localizers != null) {
+ localizers.forEach(localizerId ->
+ statusesPerLocalizer.put(localizerId, stat));
+ }
+ });
+
+ statusesPerLocalizer.asMap().forEach((localizerId,
+ localResourceStatuses) -> {
+ LocalizerStatus localizerStatus =
+ recordFactory.newRecordInstance(LocalizerStatus.class);
+ localizerStatus.setLocalizerId(localizerId);
+ localResourceStatuses.forEach(localizerStatus::addResourceStatus);
+ rsrcLclzationService.processImageLocalizerResponse(localizerStatus);
+ });
+ }
+
+ private void postFailed(LocalResource resource,
+ SerializedException ex) {
+ final Map statusesPerLocalizer =
+ new HashMap<>();
+
+ LocalResourceStatus stat =
+ recordFactory.newRecordInstance(LocalResourceStatus.class);
+ stat.setResource(resource);
+ stat.setStatus(ResourceStatusType.FETCH_FAILURE);
+ stat.setException(ex);
+
+ Collection localizers = imageToLocalizers.removeAll(
+ resource.getImageName());
+ if (localizers != null) {
+ localizers.forEach(localizerId ->
+ statusesPerLocalizer.put(localizerId, stat));
+ }
+ statusesPerLocalizer.forEach((localizerId,
+ localResourceStatus) -> {
+ LocalizerStatus localizerStatus =
+ recordFactory.newRecordInstance(LocalizerStatus.class);
+ localizerStatus.setLocalizerId(localizerId);
+ localizerStatus.addResourceStatus(localResourceStatus);
+ rsrcLclzationService.processImageLocalizerResponse(localizerStatus);
+ });
+ }
+
+ private Map getDownloading() {
+ return readOnlyDownloading;
+ }
+
+ private static class ImageLocalizationTask implements Runnable {
+
+ private final ImageLocalizer localizer;
+ private final LocalResource localResource;
+
+ ImageLocalizationTask(ImageLocalizer localizer,
+ LocalResource localResource) {
+ this.localizer = Preconditions.checkNotNull(localizer, "localizer");
+ this.localResource = Preconditions.checkNotNull(localResource,
+ "local resource");
+ }
+
+ @Override
+ public void run() {
+ try {
+ PrivilegedOperationExecutor privOpExecutor =
+ PrivilegedOperationExecutor.getInstance(localizer.conf);
+ LOG.info("Localize docker image : {}", localResource.getImageName());
+
+ DockerPullCommand pullCommand = new DockerPullCommand(
+ localResource.getImageName());
+ String output = DockerCommandExecutor.executeDockerCommand(
+ pullCommand, Long.toHexString(System.currentTimeMillis()),
+ null, privOpExecutor, false, localizer.nmContext);
+ if (output.contains(ERROR_RESPONSE)) {
+ LOG.error("Unable to localize docker image : {}", output);
+ localizer.failedDownload(localResource,
+ SerializedException.newInstance(new YarnException(output)));
+ }
+ } catch (ContainerExecutionException e) {
+ LOG.warn("Unable to localize docker image : {}",
+ localResource.getImageName());
+ localizer.failedDownload(localResource,
+ SerializedException.newInstance(e.getCause()));
+ }
+ }
+ }
+
+ private static class ImageStatusRetriever implements Runnable {
+
+ private final ImageLocalizer localizer;
+
+ ImageStatusRetriever(ImageLocalizer localizer) {
+ this.localizer = Preconditions.checkNotNull(localizer, "localizer");
+ }
+
+ @Override
+ public void run() {
+ List completed = new ArrayList<>();
+ localizer.getDownloading().forEach((imageName, localResource) -> {
+ try {
+ PrivilegedOperationExecutor privOpExecutor =
+ PrivilegedOperationExecutor.getInstance(localizer.conf);
+ DockerImagesCommand imagesCommand = new DockerImagesCommand()
+ .getSingleImageStatus(localResource.getImageName());
+ String output = DockerCommandExecutor.executeDockerCommand(
+ imagesCommand, Long.toHexString(System.currentTimeMillis()),
+ null, privOpExecutor, false, localizer.nmContext);
+ if (output != null) {
+ output = output.trim();
+ }
+ if (!Strings.isNullOrEmpty(output)) {
+ completed.add(localResource);
+ }
+ } catch (ContainerExecutionException e) {
+ LOG.warn("Unable to find existing image {}",
+ localResource.getImageName(), e);
+ }
+ });
+
+ if (!completed.isEmpty()) {
+ localizer.completedDownload(completed);
+ }
+ }
+ }
+
+ private static final String ERROR_RESPONSE = "Error response from daemon";
+ private static int STATUS_CHECKER_TIME_INTERVAL = 1;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
index d2e8e22d459..954c5c6e79e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
@@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.net.URISyntaxException;
+import java.util.Objects;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.ConverterUtils;
public class LocalResourceRequest
extends LocalResource implements Comparable {
@@ -35,6 +35,7 @@
private final LocalResourceType type;
private final LocalResourceVisibility visibility;
private final String pattern;
+ private final String imageName;
/**
* Wrap API resource to match against cache of localized resources.
@@ -43,20 +44,24 @@
*/
public LocalResourceRequest(LocalResource resource)
throws URISyntaxException {
- this(resource.getResource().toPath(),
+ this(!resource.getType().equals(LocalResourceType.DOCKER_IMAGE) ?
+ resource.getResource().toPath() : null,
resource.getTimestamp(),
resource.getType(),
resource.getVisibility(),
- resource.getPattern());
+ resource.getPattern(),
+ resource.getImageName());
}
LocalResourceRequest(Path loc, long timestamp, LocalResourceType type,
- LocalResourceVisibility visibility, String pattern) {
+ LocalResourceVisibility visibility, String pattern,
+ String imageName) {
this.loc = loc;
this.timestamp = timestamp;
this.type = type;
this.visibility = visibility;
this.pattern = pattern;
+ this.imageName = imageName;
}
@Override
@@ -67,6 +72,9 @@ public int hashCode() {
if(pattern != null) {
hash = hash ^ pattern.hashCode();
}
+ if (imageName != null) {
+ hash = hash ^ imageName.hashCode();
+ }
return hash;
}
@@ -86,7 +94,8 @@ public boolean equals(Object o) {
return getPath().equals(other.getPath()) &&
getTimestamp() == other.getTimestamp() &&
getType() == other.getType() &&
- patternEquals;
+ patternEquals &&
+ Objects.equals(imageName, other.imageName);
}
@Override
@@ -111,6 +120,17 @@ public int compareTo(LocalResourceRequest other) {
} else {
ret = pattern.compareTo(otherPattern);
}
+
+ if (0 == ret) {
+ if (imageName == null && other.imageName != null) {
+ ret = -1;
+ } else if (imageName != null && other.imageName == null) {
+ ret = 1;
+ } else {
+ ret = Objects.compare(imageName, other.imageName,
+ String::compareTo);
+ }
+ }
}
}
}
@@ -191,7 +211,17 @@ public void setVisibility(LocalResourceVisibility visibility) {
public void setPattern(String pattern) {
throw new UnsupportedOperationException();
}
-
+
+ @Override
+ public String getImageName() {
+ return imageName;
+ }
+
+ @Override
+ public void setImageName(String imageName) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -199,7 +229,8 @@ public String toString() {
sb.append(getPath().toString()).append(", ");
sb.append(getTimestamp()).append(", ");
sb.append(getType()).append(", ");
- sb.append(getPattern()).append(" }");
+ sb.append(getPattern()).append(", ");
+ sb.append(getImageName()).append(" }");
return sb.toString();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 8944ba9d6ed..8de988511ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -20,6 +20,8 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -182,6 +184,7 @@
private DirsChangeListener logDirsChangeListener;
private Context nmContext;
private DiskValidator diskValidator;
+ private ImageLocalizer imageLocalizer;
/**
* Map of LocalResourceTrackers keyed by username, for private
@@ -292,6 +295,22 @@ public void onDirsChanged() {
initializeLogDirs(lfs);
}
};
+
+ boolean isDockerRuntimeEnabled = false;
+ String[] configuredRuntimes = conf.getTrimmedStrings(
+ YarnConfiguration.LINUX_CONTAINER_RUNTIME_ALLOWED_RUNTIMES,
+ YarnConfiguration.DEFAULT_LINUX_CONTAINER_RUNTIME_ALLOWED_RUNTIMES);
+ for (String runtime : configuredRuntimes) {
+ if (LinuxContainerRuntimeConstants.RuntimeType.DOCKER.name()
+ .equals(runtime.toUpperCase())) {
+ isDockerRuntimeEnabled = true;
+ break;
+ }
+ }
+ if (isDockerRuntimeEnabled) {
+ this.imageLocalizer = new ImageLocalizer(conf, this, nmContext);
+ imageLocalizer.serviceInit(conf);
+ }
super.serviceInit(conf);
}
@@ -378,6 +397,10 @@ public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
}
+ public void processImageLocalizerResponse(LocalizerStatus status) {
+ localizerTracker.processImageLocalizerResponse(status);
+ }
+
@Override
public void serviceStart() throws Exception {
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
@@ -393,6 +416,9 @@ public void serviceStart() throws Exception {
super.serviceStart();
dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
+ if (imageLocalizer != null) {
+ imageLocalizer.serviceStart();
+ }
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -429,6 +455,9 @@ public void serviceStop() throws Exception {
server.stop();
}
cacheCleanup.shutdown();
+ if (imageLocalizer != null) {
+ imageLocalizer.serviceStop();
+ }
super.serviceStop();
}
@@ -760,7 +789,17 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
return localizer.processHeartbeat(status.getResources());
}
}
-
+
+ public void processImageLocalizerResponse(LocalizerStatus status) {
+ String locId = status.getLocalizerId();
+ synchronized (privLocalizers) {
+ LocalizerRunner localizer = privLocalizers.get(locId);
+ if (localizer != null) {
+ localizer.processImageLocalizerResults(status.getResources());
+ }
+ }
+ }
+
@Override
public void serviceStop() throws Exception {
for (LocalizerRunner localizer : privLocalizers.values()) {
@@ -1043,7 +1082,13 @@ public void run() {
}
public void addResource(LocalizerResourceRequestEvent request) {
- pending.add(request);
+ if (request.getResource().getRequest().getType().equals(
+ LocalResourceType.DOCKER_IMAGE)) {
+ imageLocalizer.localize(localizerId,
+ request.getResource().getRequest());
+ } else {
+ pending.add(request);
+ }
}
public void endContainerLocalization() {
@@ -1208,6 +1253,48 @@ LocalizerHeartbeatResponse processHeartbeat(
return response;
}
+ void processImageLocalizerResults(List statuses) {
+ LocalizerHeartbeatResponse response =
+ recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
+ String user = context.getUser();
+ ApplicationId applicationId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
+
+ // Update resource statuses.
+ for (LocalResourceStatus stat : statuses) {
+ LocalResource rsrc = stat.getResource();
+ LocalResourceRequest rsrcReq = null;
+ try {
+ rsrcReq = new LocalResourceRequest(rsrc);
+ } catch (URISyntaxException e) {
+ LOG.error("while creating rsrc req {}", rsrc.getImageName(), e);
+ continue;
+ }
+ LocalResourcesTracker tracker = getLocalResourcesTracker(
+ rsrc.getVisibility(), user, applicationId);
+ if (tracker == null) {
+ continue;
+ }
+ switch (stat.getStatus()) {
+ case FETCH_SUCCESS:
+ tracker.handle(new ResourceLocalizedEvent(rsrcReq,
+ null, stat.getLocalSize()));
+ break;
+ case FETCH_FAILURE:
+ final String diagnostics = stat.getException().toString();
+ tracker.handle(new ResourceFailedLocalizationEvent(rsrcReq,
+ diagnostics));
+ break;
+ default:
+ LOG.info("Unknown status {} : {}", rsrc.getImageName(),
+ stat.getStatus());
+ tracker.handle(new ResourceFailedLocalizationEvent(rsrcReq,
+ stat.getException().getMessage()));
+ break;
+ }
+ }
+ }
+
private Path getPathForLocalization(LocalResource rsrc,
LocalResourcesTracker tracker) throws IOException, URISyntaxException {
String user = context.getUser();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
index 9f5b23c0d25..5b602e375a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java
@@ -212,7 +212,7 @@ private LocalResourceRequest addResource(
private LocalResourceRequest createLocalResourceRequest(String path,
long timestamp) {
return new LocalResourceRequest(new Path(path), timestamp,
- LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null, null);
}
private LocalizedResource createLocalizedResource(long size, int refCount,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 15d9e2cdb7b..2703337d55c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -979,7 +979,7 @@ private LocalResourceRequest createLocalResourceRequest(String user, int i,
long ts, LocalResourceVisibility vis) {
final LocalResourceRequest req =
new LocalResourceRequest(new Path("file:///tmp/" + user + "/rsrc" + i),
- ts + i * 2000, LocalResourceType.FILE, vis, null);
+ ts + i * 2000, LocalResourceType.FILE, vis, null, null);
return req;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 9f5f608fedb..258665db2db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -2000,7 +2000,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
LocalResourceRequest req =
new LocalResourceRequest(new Path("file:///tmp"), 123L,
- LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "", null);
// We need to pre-populate the LocalizerRunner as the
// Resource Localization Service code internally starts them which
@@ -2170,14 +2170,15 @@ public void testLocalResourcePath() throws Exception {
// 2) Application resource
LocalResourceRequest reqPriv =
new LocalResourceRequest(new Path("file:///tmp1"), 123L,
- LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "", null);
List privList =
new ArrayList();
privList.add(reqPriv);
LocalResourceRequest reqApp =
new LocalResourceRequest(new Path("file:///tmp2"), 123L,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "",
+ null);
List appList =
new ArrayList();
appList.add(reqApp);
@@ -2321,7 +2322,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception {
LocalResourceRequest req =
new LocalResourceRequest(new Path("/tmp"), 123L,
- LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "", null);
// Initializing application
ApplicationImpl app = mock(ApplicationImpl.class);