diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java index e243e9051fb..c556419157c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java @@ -211,4 +211,24 @@ public static LocalResource newInstance(URL url, LocalResourceType type, @Unstable public abstract void setShouldBeUploadedToSharedCache( boolean shouldBeUploadedToSharedCache); + + /** + * Get the image name. + * + * @return image name when the type of resource is + * {@link LocalResourceType#DOCKER_IMAGE}; null otherwise. + */ + @Public + @Unstable + public abstract String getImageName(); + + /** + * Sets the docker image name. This should be set when the resource type is + * {@link LocalResourceType#DOCKER_IMAGE}. + * + * @param imageName docker image name. + */ + @Public + @Unstable + public abstract void setImageName(String imageName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java index 1552cdf8cc3..b7417e9ef1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResourceType.java @@ -38,6 +38,9 @@ *
  • * {@link #PATTERN} - A hybrid between {@link #ARCHIVE} and {@link #FILE}. *
  • + *
  • + * {@link #DOCKER_IMAGE} - Docker image. + *
  • * * * @see LocalResource @@ -66,5 +69,10 @@ * in #{@link LocalResource}. Currently only jars support pattern, all * others will be treated like a #{@link LocalResourceType#ARCHIVE}. */ - PATTERN + PATTERN, + + /** + * Docker Image. + */ + DOCKER_IMAGE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 4573859d384..17c99ef6ee1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -235,6 +235,7 @@ message LocalResourceProto { optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; optional bool should_be_uploaded_to_shared_cache = 7; + optional string image_name = 8; } message StringLongMapProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java index 84c3b6e020d..a1fbdd54445 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java @@ -135,6 +135,15 @@ public void setShouldBeUploadedToSharedCache( boolean shouldBeUploadedToSharedCache) { } + + @Override + public String getImageName() { + return null; + } + + @Override + public void setImageName(String imageName) { + } }); // Initialize list of files. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java index 560b081c016..69898db0939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java @@ -212,6 +212,25 @@ public synchronized void setShouldBeUploadedToSharedCache( builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); } + @Override + public synchronized String getImageName() { + LocalResourceProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasImageName()) { + return null; + } + return p.getImageName(); + } + + @Override + public void setImageName(String imageName) { + maybeInitBuilder(); + if (imageName == null) { + builder.clearImageName(); + return; + } + builder.setImageName(imageName); + } + private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) { return ProtoUtils.convertToProtoFormat(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java new file mode 100644 index 00000000000..91900760a3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ImageLocalizer.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerImagesCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerPullCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Docker image localizer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ImageLocalizer extends CompositeService { + + private static final Logger LOG = + LoggerFactory.getLogger(ImageLocalizer.class); + + private ExecutorService imagePuller = + HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat("ImagePuller #%d").build()); + private ScheduledExecutorService statusChecker = HadoopExecutors. + newScheduledThreadPool(10); + + private final Multimap imageToLocalizers = + ArrayListMultimap.create(); + + private final Map downloading = new HashMap<>(); + private final Map readOnlyDownloading; + private final Set localImages = new HashSet<>(); + private final Map downloaded = new HashMap<>(); + private final Map failed = new HashMap<>(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + private final Configuration conf; + private final Context nmContext; + private final ResourceLocalizationService rsrcLclzationService; + private final RecordFactory recordFactory; + + public ImageLocalizer(Configuration conf, + ResourceLocalizationService rsrcLclzationService, + Context nmContext) { + super(ImageLocalizer.class.getName()); + this.conf = Preconditions.checkNotNull(conf, "configuration"); + this.nmContext = Preconditions.checkNotNull(nmContext, "nm context"); + this.rsrcLclzationService = Preconditions.checkNotNull(rsrcLclzationService, + "resource localization service"); + recordFactory = RecordFactoryProvider.getRecordFactory(conf); + readOnlyDownloading = Collections.unmodifiableMap(downloading); + + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + findLocalImages(); + } + + @Override + protected void serviceStart() throws Exception { + statusChecker.scheduleAtFixedRate(new ImageStatusRetriever(this), + STATUS_CHECKER_TIME_INTERVAL, STATUS_CHECKER_TIME_INTERVAL, + TimeUnit.SECONDS); + } + + private void findLocalImages() { + try { + PrivilegedOperationExecutor privOpExecutor = + PrivilegedOperationExecutor.getInstance(conf); + DockerImagesCommand imagesCommand = new DockerImagesCommand(); + String output = DockerCommandExecutor.executeDockerCommand( + imagesCommand, Long.toHexString(System.currentTimeMillis()), + null, privOpExecutor, false, nmContext); + if (!Strings.isNullOrEmpty(output)) { + String[] lines = output.split(System.getProperty("line.separator")); + localImages.addAll(Arrays.asList(lines)); + } + } catch (ContainerExecutionException e) { + LOG.warn("Unable to find existing images", e); + } + } + + @Override + protected void serviceStop() throws Exception { + statusChecker.shutdown(); + imagePuller.shutdown(); + super.serviceStop(); + } + + public void localize(String localizerId, LocalResource localResource) { + try { + writeLock.lock(); + if (isPresent(localResource)) { + return; + } + imageToLocalizers.put(localResource.getImageName(), localizerId); + + if (!downloading.containsKey(localResource.getImageName())) { + downloading.put(localResource.getImageName(), localResource); + imagePuller.submit(new ImageLocalizationTask(this, localResource)); + } else { + LOG.info("{} is already downloading", localResource.getImageName()); + } + } finally { + writeLock.unlock(); + } + } + + private boolean isPresent(LocalResource localResource) { + if (localImages.contains(localResource.getImageName())) { + LOG.info("{} is already present", localResource.getImageName()); + return true; + } + if (downloaded.containsKey(localResource.getImageName())) { + LOG.info("{} already downloaded", localResource.getImageName()); + return true; + } + return false; + } + + private void completedDownload(List completedResources) { + try { + writeLock.lock(); + completedResources.forEach(completed -> { + downloading.remove(completed.getImageName()); + downloaded.put(completed.getImageName(), completed); + }); + postCompleted(completedResources); + } finally { + writeLock.unlock(); + } + } + + private void failedDownload(LocalResource localResource, + SerializedException ex) { + try { + writeLock.lock(); + downloading.remove(localResource.getImageName()); + failed.put(localResource.getImageName(), localResource); + postFailed(localResource, ex); + } finally { + writeLock.unlock(); + } + } + + private void postCompleted(List resources) { + final Multimap statusesPerLocalizer = + ArrayListMultimap.create(); + + resources.forEach(resource -> { + LocalResourceStatus stat = + recordFactory.newRecordInstance(LocalResourceStatus.class); + stat.setResource(resource); + stat.setStatus(ResourceStatusType.FETCH_SUCCESS); + Collection localizers = imageToLocalizers.removeAll( + resource.getImageName()); + if (localizers != null) { + localizers.forEach(localizerId -> + statusesPerLocalizer.put(localizerId, stat)); + } + }); + + statusesPerLocalizer.asMap().forEach((localizerId, + localResourceStatuses) -> { + LocalizerStatus localizerStatus = + recordFactory.newRecordInstance(LocalizerStatus.class); + localizerStatus.setLocalizerId(localizerId); + localResourceStatuses.forEach(localizerStatus::addResourceStatus); + rsrcLclzationService.processImageLocalizerResponse(localizerStatus); + }); + } + + private void postFailed(LocalResource resource, + SerializedException ex) { + final Map statusesPerLocalizer = + new HashMap<>(); + + LocalResourceStatus stat = + recordFactory.newRecordInstance(LocalResourceStatus.class); + stat.setResource(resource); + stat.setStatus(ResourceStatusType.FETCH_FAILURE); + stat.setException(ex); + + Collection localizers = imageToLocalizers.removeAll( + resource.getImageName()); + if (localizers != null) { + localizers.forEach(localizerId -> + statusesPerLocalizer.put(localizerId, stat)); + } + statusesPerLocalizer.forEach((localizerId, + localResourceStatus) -> { + LocalizerStatus localizerStatus = + recordFactory.newRecordInstance(LocalizerStatus.class); + localizerStatus.setLocalizerId(localizerId); + localizerStatus.addResourceStatus(localResourceStatus); + rsrcLclzationService.processImageLocalizerResponse(localizerStatus); + }); + } + + private Map getDownloading() { + return readOnlyDownloading; + } + + private static class ImageLocalizationTask implements Runnable { + + private final ImageLocalizer localizer; + private final LocalResource localResource; + + ImageLocalizationTask(ImageLocalizer localizer, + LocalResource localResource) { + this.localizer = Preconditions.checkNotNull(localizer, "localizer"); + this.localResource = Preconditions.checkNotNull(localResource, + "local resource"); + } + + @Override + public void run() { + try { + PrivilegedOperationExecutor privOpExecutor = + PrivilegedOperationExecutor.getInstance(localizer.conf); + LOG.info("Localize docker image : {}", localResource.getImageName()); + + DockerPullCommand pullCommand = new DockerPullCommand( + localResource.getImageName()); + String output = DockerCommandExecutor.executeDockerCommand( + pullCommand, Long.toHexString(System.currentTimeMillis()), + null, privOpExecutor, false, localizer.nmContext); + if (output.contains(ERROR_RESPONSE)) { + LOG.error("Unable to localize docker image : {}", output); + localizer.failedDownload(localResource, + SerializedException.newInstance(new YarnException(output))); + } + } catch (ContainerExecutionException e) { + LOG.warn("Unable to localize docker image : {}", + localResource.getImageName()); + localizer.failedDownload(localResource, + SerializedException.newInstance(e.getCause())); + } + } + } + + private static class ImageStatusRetriever implements Runnable { + + private final ImageLocalizer localizer; + + ImageStatusRetriever(ImageLocalizer localizer) { + this.localizer = Preconditions.checkNotNull(localizer, "localizer"); + } + + @Override + public void run() { + List completed = new ArrayList<>(); + localizer.getDownloading().forEach((imageName, localResource) -> { + try { + PrivilegedOperationExecutor privOpExecutor = + PrivilegedOperationExecutor.getInstance(localizer.conf); + DockerImagesCommand imagesCommand = new DockerImagesCommand() + .getSingleImageStatus(localResource.getImageName()); + String output = DockerCommandExecutor.executeDockerCommand( + imagesCommand, Long.toHexString(System.currentTimeMillis()), + null, privOpExecutor, false, localizer.nmContext); + if (output != null) { + output = output.trim(); + } + if (!Strings.isNullOrEmpty(output)) { + completed.add(localResource); + } + } catch (ContainerExecutionException e) { + LOG.warn("Unable to find existing image {}", + localResource.getImageName(), e); + } + }); + + if (!completed.isEmpty()) { + localizer.completedDownload(completed); + } + } + } + + private static final String ERROR_RESPONSE = "Error response from daemon"; + private static int STATUS_CHECKER_TIME_INTERVAL = 1; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java index d2e8e22d459..954c5c6e79e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java @@ -19,13 +19,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.net.URISyntaxException; +import java.util.Objects; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.util.ConverterUtils; public class LocalResourceRequest extends LocalResource implements Comparable { @@ -35,6 +35,7 @@ private final LocalResourceType type; private final LocalResourceVisibility visibility; private final String pattern; + private final String imageName; /** * Wrap API resource to match against cache of localized resources. @@ -43,20 +44,24 @@ */ public LocalResourceRequest(LocalResource resource) throws URISyntaxException { - this(resource.getResource().toPath(), + this(!resource.getType().equals(LocalResourceType.DOCKER_IMAGE) ? + resource.getResource().toPath() : null, resource.getTimestamp(), resource.getType(), resource.getVisibility(), - resource.getPattern()); + resource.getPattern(), + resource.getImageName()); } LocalResourceRequest(Path loc, long timestamp, LocalResourceType type, - LocalResourceVisibility visibility, String pattern) { + LocalResourceVisibility visibility, String pattern, + String imageName) { this.loc = loc; this.timestamp = timestamp; this.type = type; this.visibility = visibility; this.pattern = pattern; + this.imageName = imageName; } @Override @@ -67,6 +72,9 @@ public int hashCode() { if(pattern != null) { hash = hash ^ pattern.hashCode(); } + if (imageName != null) { + hash = hash ^ imageName.hashCode(); + } return hash; } @@ -86,7 +94,8 @@ public boolean equals(Object o) { return getPath().equals(other.getPath()) && getTimestamp() == other.getTimestamp() && getType() == other.getType() && - patternEquals; + patternEquals && + Objects.equals(imageName, other.imageName); } @Override @@ -111,6 +120,17 @@ public int compareTo(LocalResourceRequest other) { } else { ret = pattern.compareTo(otherPattern); } + + if (0 == ret) { + if (imageName == null && other.imageName != null) { + ret = -1; + } else if (imageName != null && other.imageName == null) { + ret = 1; + } else { + ret = Objects.compare(imageName, other.imageName, + String::compareTo); + } + } } } } @@ -191,7 +211,17 @@ public void setVisibility(LocalResourceVisibility visibility) { public void setPattern(String pattern) { throw new UnsupportedOperationException(); } - + + @Override + public String getImageName() { + return imageName; + } + + @Override + public void setImageName(String imageName) { + throw new UnsupportedOperationException(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -199,7 +229,8 @@ public String toString() { sb.append(getPath().toString()).append(", "); sb.append(getTimestamp()).append(", "); sb.append(getType()).append(", "); - sb.append(getPattern()).append(" }"); + sb.append(getPattern()).append(", "); + sb.append(getImageName()).append(" }"); return sb.toString(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 8944ba9d6ed..8de988511ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -20,6 +20,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants; import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,6 +184,7 @@ private DirsChangeListener logDirsChangeListener; private Context nmContext; private DiskValidator diskValidator; + private ImageLocalizer imageLocalizer; /** * Map of LocalResourceTrackers keyed by username, for private @@ -292,6 +295,22 @@ public void onDirsChanged() { initializeLogDirs(lfs); } }; + + boolean isDockerRuntimeEnabled = false; + String[] configuredRuntimes = conf.getTrimmedStrings( + YarnConfiguration.LINUX_CONTAINER_RUNTIME_ALLOWED_RUNTIMES, + YarnConfiguration.DEFAULT_LINUX_CONTAINER_RUNTIME_ALLOWED_RUNTIMES); + for (String runtime : configuredRuntimes) { + if (LinuxContainerRuntimeConstants.RuntimeType.DOCKER.name() + .equals(runtime.toUpperCase())) { + isDockerRuntimeEnabled = true; + break; + } + } + if (isDockerRuntimeEnabled) { + this.imageLocalizer = new ImageLocalizer(conf, this, nmContext); + imageLocalizer.serviceInit(conf); + } super.serviceInit(conf); } @@ -378,6 +397,10 @@ public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { return localizerTracker.processHeartbeat(status); } + public void processImageLocalizerResponse(LocalizerStatus status) { + localizerTracker.processImageLocalizerResponse(status); + } + @Override public void serviceStart() throws Exception { cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), @@ -393,6 +416,9 @@ public void serviceStart() throws Exception { super.serviceStart(); dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener); dirsHandler.registerLogDirsChangeListener(logDirsChangeListener); + if (imageLocalizer != null) { + imageLocalizer.serviceStart(); + } } LocalizerTracker createLocalizerTracker(Configuration conf) { @@ -429,6 +455,9 @@ public void serviceStop() throws Exception { server.stop(); } cacheCleanup.shutdown(); + if (imageLocalizer != null) { + imageLocalizer.serviceStop(); + } super.serviceStop(); } @@ -760,7 +789,17 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { return localizer.processHeartbeat(status.getResources()); } } - + + public void processImageLocalizerResponse(LocalizerStatus status) { + String locId = status.getLocalizerId(); + synchronized (privLocalizers) { + LocalizerRunner localizer = privLocalizers.get(locId); + if (localizer != null) { + localizer.processImageLocalizerResults(status.getResources()); + } + } + } + @Override public void serviceStop() throws Exception { for (LocalizerRunner localizer : privLocalizers.values()) { @@ -1043,7 +1082,13 @@ public void run() { } public void addResource(LocalizerResourceRequestEvent request) { - pending.add(request); + if (request.getResource().getRequest().getType().equals( + LocalResourceType.DOCKER_IMAGE)) { + imageLocalizer.localize(localizerId, + request.getResource().getRequest()); + } else { + pending.add(request); + } } public void endContainerLocalization() { @@ -1208,6 +1253,48 @@ LocalizerHeartbeatResponse processHeartbeat( return response; } + void processImageLocalizerResults(List statuses) { + LocalizerHeartbeatResponse response = + recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); + String user = context.getUser(); + ApplicationId applicationId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); + + // Update resource statuses. + for (LocalResourceStatus stat : statuses) { + LocalResource rsrc = stat.getResource(); + LocalResourceRequest rsrcReq = null; + try { + rsrcReq = new LocalResourceRequest(rsrc); + } catch (URISyntaxException e) { + LOG.error("while creating rsrc req {}", rsrc.getImageName(), e); + continue; + } + LocalResourcesTracker tracker = getLocalResourcesTracker( + rsrc.getVisibility(), user, applicationId); + if (tracker == null) { + continue; + } + switch (stat.getStatus()) { + case FETCH_SUCCESS: + tracker.handle(new ResourceLocalizedEvent(rsrcReq, + null, stat.getLocalSize())); + break; + case FETCH_FAILURE: + final String diagnostics = stat.getException().toString(); + tracker.handle(new ResourceFailedLocalizationEvent(rsrcReq, + diagnostics)); + break; + default: + LOG.info("Unknown status {} : {}", rsrc.getImageName(), + stat.getStatus()); + tracker.handle(new ResourceFailedLocalizationEvent(rsrcReq, + stat.getException().getMessage())); + break; + } + } + } + private Path getPathForLocalization(LocalResource rsrc, LocalResourcesTracker tracker) throws IOException, URISyntaxException { String user = context.getUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java index 9f5b23c0d25..5b602e375a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheCleanup.java @@ -212,7 +212,7 @@ private LocalResourceRequest addResource( private LocalResourceRequest createLocalResourceRequest(String path, long timestamp) { return new LocalResourceRequest(new Path(path), timestamp, - LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null, null); } private LocalizedResource createLocalizedResource(long size, int refCount, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 15d9e2cdb7b..2703337d55c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -979,7 +979,7 @@ private LocalResourceRequest createLocalResourceRequest(String user, int i, long ts, LocalResourceVisibility vis) { final LocalResourceRequest req = new LocalResourceRequest(new Path("file:///tmp/" + user + "/rsrc" + i), - ts + i * 2000, LocalResourceType.FILE, vis, null); + ts + i * 2000, LocalResourceType.FILE, vis, null, null); return req; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 9f5f608fedb..258665db2db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -2000,7 +2000,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception { LocalResourceRequest req = new LocalResourceRequest(new Path("file:///tmp"), 123L, - LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "", null); // We need to pre-populate the LocalizerRunner as the // Resource Localization Service code internally starts them which @@ -2170,14 +2170,15 @@ public void testLocalResourcePath() throws Exception { // 2) Application resource LocalResourceRequest reqPriv = new LocalResourceRequest(new Path("file:///tmp1"), 123L, - LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "", null); List privList = new ArrayList(); privList.add(reqPriv); LocalResourceRequest reqApp = new LocalResourceRequest(new Path("file:///tmp2"), 123L, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, ""); + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "", + null); List appList = new ArrayList(); appList.add(reqApp); @@ -2321,7 +2322,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception { LocalResourceRequest req = new LocalResourceRequest(new Path("/tmp"), 123L, - LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "", null); // Initializing application ApplicationImpl app = mock(ApplicationImpl.class);