diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RetryCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RetryCommand.java new file mode 100644 index 00000000000..6f8cd4712d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RetryCommand.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.Level; + +import java.util.function.Supplier; + +public class RetryCommand { + private final Supplier check; + private final Supplier nonNullCheck; + private final int timeout; + private final int waitPerLoop; + private final Log log; + + private final Level logLevel; + private final int logInterval; + private final String logStatement; + private final boolean logEveryLoop; + private T resultObject; + + public RetryCommand(Builder builder) { + this.timeout = builder.timeout; + this.waitPerLoop = builder.waitPerLoop; + this.check = builder.check; + this.nonNullCheck = builder.nonNullCheck; + this.log = builder.logger; + this.logLevel = builder.logLevel; + this.logInterval = builder.logInterval; + this.logStatement = builder.logStatement; + this.logEveryLoop = logInterval < 0; + } + + public T execute() { + int loggingCounter = logInterval; + int timeWaiting = 0; + + //always log the first occasion + logStatement(); + do { + if (timeWaiting >= timeout) { + break; + } + if (log.isDebugEnabled()) { + log.debug(": Check the condition for main loop."); + } + + if (logEveryLoop || --loggingCounter <= 0) { + logStatement(); + loggingCounter = logInterval; + } + + if (getCheckResult()) { + if (log.isDebugEnabled()) { + log.debug(": Exiting from the main loop."); + } + return resultObject; + } + + try { + Thread.sleep(waitPerLoop); + timeWaiting += waitPerLoop; + } catch (InterruptedException e) { + // On any interrupt, break the loop and continue execution. + break; + } + } while (true); + return null; + } + + private boolean getCheckResult() { + if (check != null) { + return check.get(); + } else if (nonNullCheck != null) { + T value = nonNullCheck.get(); + if (value != null) { + resultObject = value; + return true; + } + } + return false; + } + + public T getResultObject() { + return resultObject; + } + + private void logStatement() { + if (logLevel == Level.DEBUG) { + if (log.isDebugEnabled()) { + log.debug(": Waiting in main loop."); + log.debug(logStatement); + } + } else if (logLevel == Level.TRACE) { + if (log.isTraceEnabled()) { + log.trace(": Waiting in main loop."); + log.trace(logStatement); + } + } else if (logLevel == Level.ERROR) { + log.error(logStatement); + } else if (logLevel == Level.WARN) { + log.warn(logStatement); + } else { + log.info(logStatement); + } + } + + public static final class Builder { + private Log logger; + private Level logLevel; + private int logInterval = -1; + private String logStatement; + private int timeout = -1; + private int waitPerLoop = -1; + private Supplier check; + private Supplier nonNullCheck; + + public Builder() { + } + + public Builder withLogger(Log logger) { + this.logger = logger; + return this; + } + + public Builder withTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public Builder withWaitPerLoop(int waitPerLoop) { + this.waitPerLoop = waitPerLoop; + return this; + } + + public Builder withLogInterval(int logInterval) { + this.logInterval = logInterval; + return this; + } + + public Builder withCheck(Supplier check) { + this.check = check; + return this; + } + + public Builder withNonNullCheck(Supplier check) { + this.nonNullCheck = check; + return this; + } + + public Builder withLogStatement(String logStatement) { + this.logStatement = logStatement; + return this; + } + + public Builder withLogLevel(Level logLevel) { + this.logLevel = logLevel; + return this; + } + + public RetryCommand build() { + if (logger == null) { + logger = LogFactory.getLog(RetryCommand.class); + } + if (logLevel == null) { + logLevel = Level.DEBUG; + } + + Preconditions.checkState(check != null || nonNullCheck != null, + "Either check or non-null check should not be provided"); + Preconditions.checkNotNull(logStatement, + "logStatement should be defined"); + Preconditions.checkArgument(timeout >= 0, + "timeout should be positive value"); + Preconditions.checkArgument(waitPerLoop >= 0, + "waitPerLoop should be positive value"); + + return new RetryCommand<>(this); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java index 81a965522ce..3f2732af584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -33,38 +35,49 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.AssignedGpuDevice; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice; +import org.apache.hadoop.yarn.util.RetryCommand; +import org.apache.log4j.Level; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; /** - * Allocate GPU resources according to requirements + * Allocate GPU resources according to requirements. */ public class GpuResourceAllocator { - final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class); + private final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class); private static final int WAIT_MS_PER_LOOP = 1000; private Set allowedGpuDevices = new TreeSet<>(); private Map usedDevices = new TreeMap<>(); private Context nmContext; + private final int waitPeriodForResource; public GpuResourceAllocator(Context ctx) { this.nmContext = ctx; + // Wait for a maximum of 120 seconds if no available GPU are there which + // are yet to be released. + this.waitPeriodForResource = 120 * WAIT_MS_PER_LOOP; + } + + @VisibleForTesting + GpuResourceAllocator(Context ctx, int waitPeriodForResource) { + this.nmContext = ctx; + this.waitPeriodForResource = waitPeriodForResource; } /** - * Contains allowed and denied devices + * Contains allowed and denied devices. * Denied devices will be useful for cgroups devices module to do blacklisting */ static class GpuAllocation { @@ -90,20 +103,13 @@ public GpuResourceAllocator(Context ctx) { } /** - * Add GPU to allowed list + * Add GPU to the allowed list of GPUs. * @param gpuDevice gpu device */ public synchronized void addGpu(GpuDevice gpuDevice) { allowedGpuDevices.add(gpuDevice); } - private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices, - ContainerId containerId) { - return "Failed to find enough GPUs, requestor=" + containerId - + ", #RequestedGPUs=" + numRequestedGpuDevices + ", #availableGpus=" - + getAvailableGpus(); - } - @VisibleForTesting public synchronized int getAvailableGpus() { return allowedGpuDevices.size() - usedDevices.size(); @@ -112,10 +118,10 @@ public synchronized int getAvailableGpus() { public synchronized void recoverAssignedGpus(ContainerId containerId) throws ResourceHandlerException { Container c = nmContext.getContainers().get(containerId); - if (null == c) { + if (c == null) { throw new ResourceHandlerException( - "This shouldn't happen, cannot find container with id=" - + containerId); + "Cannot find container with id=" + containerId + + ", this should not occur under normal circumstances!"); } for (Serializable gpuDeviceSerializable : c.getResourceMappings() @@ -123,7 +129,8 @@ public synchronized void recoverAssignedGpus(ContainerId containerId) if (!(gpuDeviceSerializable instanceof GpuDevice)) { throw new ResourceHandlerException( "Trying to recover device id, however it" - + " is not GpuDevice, this shouldn't happen"); + + " is not an instance of " + GpuDevice.class.getName() + + ", this should not occur under normal circumstances!"); } GpuDevice gpuDevice = (GpuDevice) gpuDeviceSerializable; @@ -132,8 +139,8 @@ public synchronized void recoverAssignedGpus(ContainerId containerId) if (!allowedGpuDevices.contains(gpuDevice)) { throw new ResourceHandlerException( "Try to recover device = " + gpuDevice - + " however it is not in allowed device list:" + StringUtils - .join(",", allowedGpuDevices)); + + " however it is not in the allowed device list:" + + StringUtils.join(",", allowedGpuDevices)); } // Make sure it is not occupied by anybody else @@ -163,7 +170,7 @@ public static int getRequestedGpus(Resource requestedResource) { } /** - * Assign GPU to requestor + * Assign GPU to the specified container. * @param container container to allocate * @return allocation results. * @throws ResourceHandlerException When failed to assign GPUs. @@ -172,31 +179,16 @@ public GpuAllocation assignGpus(Container container) throws ResourceHandlerException { GpuAllocation allocation = internalAssignGpus(container); - // Wait for a maximum of 120 seconds if no available GPU are there which - // are yet to be released. - final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP; - int timeWaiting = 0; - while (allocation == null) { - if (timeWaiting >= timeoutMsecs) { - break; - } - - // Sleep for 1 sec to ensure there are some free GPU devices which are - // getting released. - try { - LOG.info("Container : " + container.getContainerId() - + " is waiting for free GPU devices."); - Thread.sleep(WAIT_MS_PER_LOOP); - timeWaiting += WAIT_MS_PER_LOOP; - allocation = internalAssignGpus(container); - } catch (InterruptedException e) { - // On any interrupt, break the loop and continue execution. - break; + if (allocation == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("GPU allocation for container " + container.getContainerId() + + " was not successful! " + "Starting retry loop.."); } + allocation = startGpuAssignmentLoop(container); } - if(allocation == null) { - String message = "Could not get valid GPU device for container '" + + if (allocation == null) { + final String message = "Could not get valid GPU device for container '" + container.getContainerId() + "' as some other containers might not releasing GPUs."; LOG.warn(message); @@ -205,13 +197,47 @@ public GpuAllocation assignGpus(Container container) return allocation; } + private GpuAllocation startGpuAssignmentLoop(Container container) + throws ResourceHandlerException { + RetryCommand retryCommand; + try { + retryCommand = + new RetryCommand.Builder().withNonNullCheck(() -> { + try { + return internalAssignGpus(container); + } catch (ResourceHandlerException e) { + throw new RuntimeException(e); + } + }).withTimeout(waitPeriodForResource) + .withWaitPerLoop(WAIT_MS_PER_LOOP).withLogger(LOG) + .withLogLevel(Level.INFO).withLogInterval(10) + .withLogStatement("Container: " + container.getContainerId() + + " is waiting for free GPU devices.") + .build(); + } catch (RuntimeException e) { + if (e.getCause() instanceof ResourceHandlerException) { + throw (ResourceHandlerException) e.getCause(); + } else { + throw e; + } + } + return retryCommand.execute(); + } + private synchronized GpuAllocation internalAssignGpus(Container container) throws ResourceHandlerException { Resource requestedResource = container.getResource(); ContainerId containerId = container.getContainerId(); int numRequestedGpuDevices = getRequestedGpus(requestedResource); - // Assign Gpus to container if requested some. + + // Assign GPUs to container if requested some. if (numRequestedGpuDevices > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Trying to assign %d GPUs to container: %s" + + ", #AvailableGPUs=%d, #ReleasingGPUs=%d", + numRequestedGpuDevices, containerId, + getAvailableGpus(), getReleasingGpus())); + } if (numRequestedGpuDevices > getAvailableGpus()) { // If there are some devices which are getting released, wait for few // seconds to get it. @@ -222,8 +248,9 @@ private synchronized GpuAllocation internalAssignGpus(Container container) if (numRequestedGpuDevices > getAvailableGpus()) { throw new ResourceHandlerException( - getResourceHandlerExceptionMessage(numRequestedGpuDevices, - containerId)); + "Failed to find enough GPUs, requestor=" + containerId + + ", #RequestedGPUs=" + numRequestedGpuDevices + + ", #AvailableGPUs=" + getAvailableGpus()); } Set assignedGpus = new TreeSet<>(); @@ -245,7 +272,7 @@ private synchronized GpuAllocation internalAssignGpus(Container container) nmContext.getNMStateStore().storeAssignedResources(container, GPU_URI, new ArrayList<>(assignedGpus)); } catch (IOException e) { - cleanupAssignGpus(containerId); + unassignGpus(containerId); throw new ResourceHandlerException(e); } } @@ -258,50 +285,44 @@ private synchronized GpuAllocation internalAssignGpus(Container container) private synchronized long getReleasingGpus() { long releasingGpus = 0; - Iterator> iter = usedDevices.entrySet() - .iterator(); - while (iter.hasNext()) { - ContainerId containerId = iter.next().getValue(); - Container container; - if ((container = nmContext.getContainers().get(containerId)) != null) { - if (container.isContainerInFinalStates()) { - releasingGpus = releasingGpus + container.getResource() - .getResourceInformation(ResourceInformation.GPU_URI).getValue(); - } + for (ContainerId containerId : ImmutableSet.copyOf(usedDevices.values())) { + Container container = nmContext.getContainers().get(containerId); + if (container != null && container.isContainerInFinalStates()) { + releasingGpus += container.getResource() + .getResourceInformation(ResourceInformation.GPU_URI).getValue(); } } return releasingGpus; } /** - * Clean up all Gpus assigned to containerId + * Clean up all GPUs assigned to containerId. * @param containerId containerId */ - public synchronized void cleanupAssignGpus(ContainerId containerId) { - Iterator> iter = - usedDevices.entrySet().iterator(); - while (iter.hasNext()) { - if (iter.next().getValue().equals(containerId)) { - iter.remove(); - } + public synchronized void unassignGpus(ContainerId containerId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to unassign GPU device from container " + containerId); } + usedDevices.entrySet().removeIf(entry -> + entry.getValue().equals(containerId)); } @VisibleForTesting - public synchronized Map getDeviceAllocationMappingCopy() { - return new HashMap<>(usedDevices); + public synchronized Map getDeviceAllocationMapping() { + return ImmutableMap.copyOf(usedDevices); } - public synchronized List getAllowedGpusCopy() { - return new ArrayList<>(allowedGpuDevices); + public synchronized List getAllowedGpus() { + return ImmutableList.copyOf(allowedGpuDevices); } - public synchronized List getAssignedGpusCopy() { - List assigns = new ArrayList<>(); - for (Map.Entry entry : usedDevices.entrySet()) { - assigns.add(new AssignedGpuDevice(entry.getKey().getIndex(), - entry.getKey().getMinorNumber(), entry.getValue())); - } - return assigns; + public synchronized List getAssignedGpus() { + return usedDevices.entrySet().stream() + .map(e -> { + final GpuDevice gpu = e.getKey(); + ContainerId containerId = e.getValue(); + return new AssignedGpuDevice(gpu.getIndex(), gpu.getMinorNumber(), + containerId); + }).collect(Collectors.toList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java index e25a6b5bee7..be22b53091b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java @@ -175,7 +175,7 @@ public GpuResourceAllocator getGpuAllocator() { @Override public synchronized List postComplete( ContainerId containerId) throws ResourceHandlerException { - gpuAllocator.cleanupAssignGpus(containerId); + gpuAllocator.unassignGpus(containerId); cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerId.toString()); return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java index f28218de224..d77a901f16d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; @@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.NMGpuResourceInfo; import java.util.List; -import java.util.Map; public class GpuResourcePlugin implements ResourcePlugin { private GpuResourceHandlerImpl gpuResourceHandler = null; @@ -82,9 +80,9 @@ public NMResourceInfo getNMResourceInfo() throws YarnException { GpuDiscoverer.getInstance().getGpuDeviceInformation(); GpuResourceAllocator gpuResourceAllocator = gpuResourceHandler.getGpuAllocator(); - List totalGpus = gpuResourceAllocator.getAllowedGpusCopy(); + List totalGpus = gpuResourceAllocator.getAllowedGpus(); List assignedGpuDevices = - gpuResourceAllocator.getAssignedGpusCopy(); + gpuResourceAllocator.getAssignedGpus(); return new NMGpuResourceInfo(gpuDeviceInformation, totalGpus, assignedGpuDevices); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java new file mode 100644 index 00000000000..e3520b51fd7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceAllocator.GpuAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class TestGpuResourceAllocator { + private static final int WAIT_PERIOD_FOR_RESOURCE = 100; + + private static class ContainerMatcher extends ArgumentMatcher { + + private Container container; + ContainerMatcher(Container container) { + this.container = container; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof Container)) { + return false; + } + Container other = (Container)o; + + long expectedId = container.getContainerId().getContainerId(); + long otherId = other.getContainerId().getContainerId(); + return expectedId == otherId; + } + } + + @Captor + private ArgumentCaptor> gpuCaptor; + + @Mock + private NMContext nmContext; + + @Mock + private NMStateStoreService nmStateStore; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private GpuResourceAllocator testSubject; + + @Before + public void setup() { + TestResourceUtils.addNewTypesToResources(ResourceInformation.GPU_URI); + MockitoAnnotations.initMocks(this); + testSubject = createTestSubject(WAIT_PERIOD_FOR_RESOURCE); + } + + private GpuResourceAllocator createTestSubject(int waitPeriodForResource) { + when(nmContext.getNMStateStore()).thenReturn(nmStateStore); + when(nmContext.getContainers()).thenReturn(new ConcurrentHashMap<>()); + return new GpuResourceAllocator(nmContext, waitPeriodForResource); + } + + private Resource createGpuResourceRequest(int gpus) { + Resource res = Resource.newInstance(1024, 1); + + if (gpus > 0) { + res.setResourceValue(ResourceInformation.GPU_URI, gpus); + } + return res; + } + + private List createMockContainers(int gpus, int numberOfContainers) { + final long id = 111L; + + List containers = Lists.newArrayList(); + for (int i = 0; i < numberOfContainers; i++) { + containers.add(createMockContainer(gpus, id + i)); + } + return containers; + } + + private Container createMockContainer(int gpus, long id) { + Resource res = createGpuResourceRequest(gpus); + ContainerId containerId = mock(ContainerId.class); + when(containerId.getContainerId()).thenReturn(id); + + Container container = mock(Container.class); + when(container.getResource()).thenReturn(res); + when(container.getContainerId()).thenReturn(containerId); + when(container.getContainerState()).thenReturn(ContainerState.RUNNING); + nmContext.getContainers().put(containerId, container); + + return container; + } + + private void createAndAddGpus(int numberOfGpus) { + for (int i = 0; i < numberOfGpus; i++) { + testSubject.addGpu(new GpuDevice(1, i)); + } + + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + assertEquals(numberOfGpus, testSubject.getAllowedGpus().size()); + assertEquals(numberOfGpus, testSubject.getAvailableGpus()); + } + + private void addGpus(GpuDevice... gpus) { + for (GpuDevice gpu : gpus) { + testSubject.addGpu(gpu); + } + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + assertEquals(gpus.length, testSubject.getAllowedGpus().size()); + assertEquals(gpus.length, testSubject.getAvailableGpus()); + } + + private void addGpusAndDontVerify(GpuDevice... gpus) { + for (GpuDevice gpu : gpus) { + testSubject.addGpu(gpu); + } + } + + private void setupContainerAsReleasingGpus(Container... releasingContainers) { + ContainerState[] finalStates = new ContainerState[] { + ContainerState.KILLING, ContainerState.DONE, + ContainerState.LOCALIZATION_FAILED, + ContainerState.CONTAINER_RESOURCES_CLEANINGUP, + ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerState.EXITED_WITH_FAILURE, + ContainerState.EXITED_WITH_SUCCESS + }; + + final Random random = new Random(); + for (Container container : releasingContainers) { + ContainerState state = finalStates[random.nextInt(finalStates.length)]; + when(container.getContainerState()).thenReturn(state); + when(container.isContainerInFinalStates()).thenReturn(true); + } + } + + private void assertAllocatedGpu(GpuDevice expectedGpu, Container container, + GpuAllocation allocation) throws IOException { + assertEquals(1, allocation.getAllowedGPUs().size()); + assertEquals(0, allocation.getDeniedGPUs().size()); + + Set allowedGPUs = allocation.getAllowedGPUs(); + assertEquals(1, allowedGPUs.size()); + + GpuDevice allocatedGpu = (GpuDevice) allowedGPUs.toArray()[0]; + assertEquals(expectedGpu, allocatedGpu); + assertAssignmentInStateStore(expectedGpu, container); + } + + private void assertAllocatedGpus(int gpus, int deniedGpus, Container container, + GpuAllocation allocation) throws IOException { + assertEquals(gpus, allocation.getAllowedGPUs().size()); + assertEquals(deniedGpus, allocation.getDeniedGPUs().size()); + assertAssignmentInStateStore(gpus, container); + } + + private void assertNoAllocation(GpuAllocation allocation) { + assertEquals(1, allocation.getDeniedGPUs().size()); + assertEquals(0, allocation.getAllowedGPUs().size()); + verifyZeroInteractions(nmStateStore); + } + + private void assertAssignmentInStateStore(GpuDevice expectedGpu, + Container container) throws IOException { + verify(nmStateStore).storeAssignedResources( + argThat(new ContainerMatcher(container)), eq(GPU_URI), gpuCaptor.capture()); + + List gpuList = gpuCaptor.getValue(); + assertEquals(1, gpuList.size()); + assertEquals(expectedGpu, gpuList.get(0)); + } + + private void assertAssignmentInStateStore(int gpus, + Container container) throws IOException { + verify(nmStateStore).storeAssignedResources( + argThat(new ContainerMatcher(container)), eq(GPU_URI), gpuCaptor.capture()); + + List gpuList = gpuCaptor.getValue(); + assertEquals(gpus, gpuList.size()); + } + + private static Set findDuplicates(List allocations) { + final Set result = new HashSet<>(); + final Set tmpSet = new HashSet<>(); + + for (GpuAllocation allocation : allocations) { + if (!tmpSet.add(allocation)) { + result.add(allocation); + } + } + return result; + } + + @Test + public void testNewGpuAllocatorHasEmptyCollectionOfDevices() { + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + assertEquals(0, testSubject.getAllowedGpus().size()); + assertEquals(0, testSubject.getAvailableGpus()); + } + + @Test + public void testAddOneDevice() { + addGpus(new GpuDevice(1, 1)); + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + } + + @Test + public void testAddMoreDevices() { + addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3)); + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + } + + @Test + public void testAddMoreDevicesWithSameData() { + addGpusAndDontVerify(new GpuDevice(1, 1), new GpuDevice(1, 1)); + assertEquals(0, testSubject.getDeviceAllocationMapping().size()); + assertEquals(0, testSubject.getAssignedGpus().size()); + assertEquals(1, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + } + + @Test + public void testRequestZeroGpu() throws ResourceHandlerException { + addGpus(new GpuDevice(1, 1)); + + Container container = createMockContainer(0, 5L); + GpuAllocation allocation = + testSubject.assignGpus(container); + + assertNoAllocation(allocation); + } + + @Test + public void testRequestOneGpu() throws ResourceHandlerException, IOException { + GpuDevice gpu = new GpuDevice(1, 1); + addGpus(gpu); + + Container container = createMockContainer(1, 5L); + GpuAllocation allocation = + testSubject.assignGpus(container); + + assertEquals(1, testSubject.getDeviceAllocationMapping().size()); + assertEquals(1, testSubject.getAssignedGpus().size()); + assertEquals(1, testSubject.getAllowedGpus().size()); + assertEquals(0, testSubject.getAvailableGpus()); + + assertAllocatedGpu(gpu, container, allocation); + } + + @Test + public void testRequestMoreThanAvailableGpu() throws ResourceHandlerException { + addGpus(new GpuDevice(1, 1)); + Container container = createMockContainer(2, 5L); + + exception.expect(ResourceHandlerException.class); + exception.expectMessage("Failed to find enough GPUs"); + testSubject.assignGpus(container); + } + + @Test + public void testRequestMoreThanAvailableGpuAndOneContainerIsReleasingGpus() + throws ResourceHandlerException, IOException { + addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3)); + Container container = createMockContainer(2, 5L); + GpuAllocation allocation = testSubject.assignGpus(container); + assertAllocatedGpus(2, 1, container, allocation); + + assertEquals(2, testSubject.getDeviceAllocationMapping().size()); + assertEquals(2, testSubject.getAssignedGpus().size()); + assertEquals(3, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + + setupContainerAsReleasingGpus(container); + Container container2 = createMockContainer(2, 6L); + + exception.expect(ResourceHandlerException.class); + exception.expectMessage("as some other containers might not releasing GPUs"); + GpuAllocation allocation2 = testSubject.assignGpus(container2); + assertAllocatedGpus(2, 1, container, allocation2); + } + + @Test + public void testThreeContainersJustTwoOfThemSatisfied() + throws ResourceHandlerException, IOException { + addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), + new GpuDevice(1, 3), new GpuDevice(1, 4), + new GpuDevice(1, 5), new GpuDevice(1, 6)); + Container container = createMockContainer(3, 5L); + Container container2 = createMockContainer(2, 6L); + Container container3 = createMockContainer(2, 6L); + + GpuAllocation allocation = testSubject.assignGpus(container); + assertAllocatedGpus(3, 3, container, allocation); + assertEquals(3, testSubject.getDeviceAllocationMapping().size()); + assertEquals(3, testSubject.getAssignedGpus().size()); + assertEquals(6, testSubject.getAllowedGpus().size()); + assertEquals(3, testSubject.getAvailableGpus()); + + GpuAllocation allocation2 = testSubject.assignGpus(container2); + assertAllocatedGpus(2, 4, container2, allocation2); + assertEquals(5, testSubject.getDeviceAllocationMapping().size()); + assertEquals(5, testSubject.getAssignedGpus().size()); + assertEquals(6, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + + exception.expect(ResourceHandlerException.class); + exception.expectMessage("Failed to find enough GPUs"); + testSubject.assignGpus(container3); + } + + @Test + public void testReleaseAndAssignGpus() + throws ResourceHandlerException, IOException { + addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3)); + Container container = createMockContainer(2, 5L); + GpuAllocation allocation = testSubject.assignGpus(container); + assertAllocatedGpus(2, 1, container, allocation); + + assertEquals(2, testSubject.getDeviceAllocationMapping().size()); + assertEquals(2, testSubject.getAssignedGpus().size()); + assertEquals(3, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + + setupContainerAsReleasingGpus(container); + Container container2 = createMockContainer(2, 6L); + try { + testSubject.assignGpus(container2); + } catch (ResourceHandlerException e) { + //intended as we have not enough GPUs available + } + + assertEquals(2, testSubject.getDeviceAllocationMapping().size()); + assertEquals(2, testSubject.getAssignedGpus().size()); + assertEquals(3, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + + testSubject.unassignGpus(container.getContainerId()); + GpuAllocation allocation2 = testSubject.assignGpus(container2); + assertAllocatedGpus(2, 1, container, allocation2); + } + + @Test + public void testCreateLotsOfContainersVerifyGpuAssignmentsAreCorrect() + throws ResourceHandlerException, IOException { + createAndAddGpus(100); + + List containers = createMockContainers(3, 33); + List allocations = Lists.newArrayList(); + for (Container container : containers) { + GpuAllocation allocation = testSubject.assignGpus(container); + allocations.add(allocation); + assertAllocatedGpus(3, 97, container, allocation); + } + + assertEquals(99, testSubject.getDeviceAllocationMapping().size()); + assertEquals(99, testSubject.getAssignedGpus().size()); + assertEquals(100, testSubject.getAllowedGpus().size()); + assertEquals(1, testSubject.getAvailableGpus()); + + Set duplicateAllocations = findDuplicates(allocations); + assertEquals(0, duplicateAllocations.size()); + } + + @Test + public void testGpuGetsUnassignedWhenStateStoreThrowsException() + throws ResourceHandlerException, IOException { + doThrow(new IOException("Failed to save container mappings to NM state store!")) + .when(nmStateStore).storeAssignedResources(any(Container.class), + anyString(), anyListOf(Serializable.class)); + + createAndAddGpus(1); + + exception.expect(ResourceHandlerException.class); + exception.expectMessage("Failed to save container mappings to NM state store"); + Container container = createMockContainer(1, 5L); + testSubject.assignGpus(container); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java index 18785e151f3..de97ca0dca8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandler.java @@ -404,7 +404,7 @@ public void testRecoverResourceAllocation() throws Exception { gpuResourceHandler.reacquireContainer(getContainerId(1)); Map deviceAllocationMapping = - gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy(); + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertTrue( deviceAllocationMapping.keySet().contains(new GpuDevice(1, 1))); @@ -438,7 +438,7 @@ public void testRecoverResourceAllocation() throws Exception { // Make sure internal state not changed. deviceAllocationMapping = - gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy(); + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertTrue(deviceAllocationMapping.keySet() .containsAll(Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 3)))); @@ -470,7 +470,7 @@ public void testRecoverResourceAllocation() throws Exception { // Make sure internal state not changed. deviceAllocationMapping = - gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy(); + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertTrue(deviceAllocationMapping.keySet() .containsAll(Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 3))));