diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultHeadroomProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultHeadroomProvider.java new file mode 100644 index 0000000..c094f0f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultHeadroomProvider.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +public class DefaultHeadroomProvider implements HeadroomProvider { + + private Resource headroom = Resource.newInstance(0, 0); + + @Override + public void setHeadroom(Resource headroom) { + this.headroom = headroom; + } + + @Override + public Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (headroom.getMemory() < 0) { + headroom.setMemory(0); + } + return headroom; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/HeadroomProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/HeadroomProvider.java new file mode 100644 index 0000000..23c8ceb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/HeadroomProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Implementers accept and provide headroom information for applications, + * potentially recalculating on demand to provide more up-to-date information + */ +public interface HeadroomProvider { + + public void setHeadroom(Resource headroom); + + public Resource getHeadroom(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 933f456..7534cda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -79,12 +79,13 @@ private final Multiset reReservations = HashMultiset.create(); protected final Resource currentReservation = Resource.newInstance(0, 0); - private Resource resourceLimit = Resource.newInstance(0, 0); protected Resource currentConsumption = Resource.newInstance(0, 0); private Resource amResource; private boolean unmanagedAM = true; private boolean amRunning = false; + private HeadroomProvider headroomProvider = new DefaultHeadroomProvider(); + protected List newlyAllocatedContainers = new ArrayList(); @@ -215,6 +216,14 @@ public void setAmRunning(boolean bool) { amRunning = bool; } + public synchronized void setHeadroomProvider(HeadroomProvider headroomProvider) { + this.headroomProvider = headroomProvider; + } + + public synchronized HeadroomProvider getHeadroomProvider() { + return headroomProvider; + } + public boolean getUnmanagedAM() { return unmanagedAM; } @@ -342,7 +351,7 @@ public synchronized boolean isReserved(SchedulerNode node, Priority priority) { } public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; + headroomProvider.setHeadroom(globalLimit); } /** @@ -350,12 +359,7 @@ public synchronized void setHeadroom(Resource globalLimit) { * @return available resource headroom */ public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemory() < 0) { - resourceLimit.setMemory(0); - } - - return resourceLimit; + return headroomProvider.getHeadroom(); } public synchronized int getNumReservedContainers(Priority priority) { @@ -518,7 +522,7 @@ public synchronized ApplicationResourceUsageReport getResourceUsageReport() { } public synchronized Resource getResourceLimit() { - return this.resourceLimit; + return headroomProvider.getHeadroom(); } public synchronized Map getLastScheduledContainer() { @@ -530,7 +534,7 @@ public synchronized void transferStateFromPreviousAttempt( this.liveContainers = appAttempt.getLiveContainersMap(); // this.reReservations = appAttempt.reReservations; this.currentConsumption = appAttempt.getCurrentConsumption(); - this.resourceLimit = appAttempt.getResourceLimit(); + this.headroomProvider = appAttempt.getHeadroomProvider(); // this.currentReservation = appAttempt.currentReservation; // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; // this.schedulingOpportunities = appAttempt.schedulingOpportunities; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c8a73bf..dcd1d5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -463,6 +463,10 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); + + // Re-calculate headroom for active applications + root.updateClusterResource(clusterResource); + initializeQueueMappings(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c93c5f..62df2b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.HeadroomProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -965,20 +967,24 @@ private synchronized boolean assignToQueue(Resource clusterResource, } return true; } + + private Resource getHeadroom(User user, Resource queueMaxCap, + Resource clusterResource, Resource userLimit) { + /** + * Headroom is min((userLimit, queue-max-cap) - consumed) + */ + Resource headroom = + Resources.subtract( + Resources.min(resourceCalculator, clusterResource, + userLimit, queueMaxCap), + user.getConsumedResources()); + return headroom; + } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { - String user = application.getUser(); - - /** - * Headroom is min((userLimit, queue-max-cap) - consumed) - */ - - Resource userLimit = // User limit - computeUserLimit(application, clusterResource, required); - //Max avail capacity needs to take into account usage by ancestor-siblings //which are greater than their base capacity, so we are interested in "max avail" //capacity @@ -992,36 +998,64 @@ private Resource computeUserLimitAndSetHeadroom( absoluteMaxAvailCapacity, minimumAllocation); - Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = - Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), - userConsumed); + String appUser = application.getUser(); - if (LOG.isDebugEnabled()) { - LOG.debug("Headroom calculation for user " + user + ": " + - " userLimit=" + userLimit + - " queueMaxCap=" + queueMaxCap + - " consumed=" + userConsumed + - " headroom=" + headroom); - } + User appQueueUser = getUser(appUser); - application.setHeadroom(headroom); - metrics.setAvailableResourcesToUser(user, headroom); + //The value to return, will be captured when we reach this user during iteration + Resource appUserLimit = null; + + synchronized(this) { //users changes are syncronized on the LeafQueue + //update the lastRequired for this user, it will be used during + //headroom calculation + appQueueUser.setLastRequired(required); + //When computing headroom we do so for all users in a queue since changes to utilization can effect them all + //and failure to do so can result in innacurate headroom, which can lead to application inefficiencies/deadlocks + for (Map.Entry userEntry : users.entrySet()) { + String user = userEntry.getKey(); + User queueUser = userEntry.getValue(); + + //calculate userlimit and headroom for this user, and update the headroom provider + //(which is shared across all applications for a given user) + Resource userLimit = computeUserLimit(user, clusterResource, queueUser); + Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); + queueUser.getHeadroomProvider().setHeadroom(headroom); + + //update metrics, this could also be done only when this user was called + //(in the below conditional) + metrics.setAvailableResourcesToUser(user, headroom); + + //If this is the applications user, we need to grab the userLimit to return it + //and we need to assure it has the per-user-queue-shared headroom provider + if (appUser.equals(user)) { + appUserLimit = userLimit; + application.setHeadroomProvider(queueUser.getHeadroomProvider()); + } + } + } - return userLimit; + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for user " + appUser + ": " + + " userLimit=" + appUserLimit + + " queueMaxCap=" + queueMaxCap + + " consumed=" + appQueueUser.getConsumedResources() + + " headroom=" + appQueueUser.getHeadroomProvider().getHeadroom()); + } + + return appUserLimit; } @Lock(NoLock.class) - private Resource computeUserLimit(FiCaSchedulerApp application, - Resource clusterResource, Resource required) { + private Resource computeUserLimit(String userName, + Resource clusterResource, User user) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if // we're running below capacity. The 'max' ensures that jobs in queues // with miniscule capacity (< 1 slot) make progress // * If we're running over capacity, then its // (usedResources + required) (which extra resources we are allocating) + + Resource required = user.getLastRequired(); // Allow progress for queues with miniscule capacity final Resource queueCapacity = @@ -1066,13 +1100,12 @@ private Resource computeUserLimit(FiCaSchedulerApp application, minimumAllocation); if (LOG.isDebugEnabled()) { - String userName = application.getUser(); LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + getUser(userName).getConsumedResources() + + " consumed: " + user.getConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + usedResources + @@ -1492,6 +1525,7 @@ synchronized void releaseResource(Resource clusterResource, String userName = application.getUser(); User user = getUser(userName); user.releaseContainer(resource); + Resources.addTo(application.getHeadroom(), resource); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + @@ -1544,6 +1578,8 @@ public QueueMetrics getMetrics() { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; + HeadroomProvider headroomProvider = new DefaultHeadroomProvider(); + Resource lastRequired = Resources.createResource(0, 0); public Resource getConsumedResources() { return consumed; @@ -1586,6 +1622,22 @@ public synchronized void assignContainer(Resource resource) { public synchronized void releaseContainer(Resource resource) { Resources.subtractFrom(consumed, resource); } + + public synchronized HeadroomProvider getHeadroomProvider() { + return headroomProvider; + } + + public synchronized void setHeadroomProvider(HeadroomProvider headroomProvider) { + this.headroomProvider = headroomProvider; + } + + public synchronized Resource getLastRequired() { + return lastRequired; + } + + public synchronized void setLastRequired(Resource lastRequired) { + this.lastRequired = lastRequired; + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a9a9975..2083b3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -518,7 +518,7 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = @@ -536,8 +536,8 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute - verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = @@ -556,17 +556,17 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); - verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); - verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d5eb933..b36f572 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -624,6 +624,91 @@ public void testUserLimits() throws Exception { } @Test + public void testUserHeadroomMultiApp() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_1, user_0); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_2, user_1); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 16*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 16*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + Priority priority = TestUtils.createMockPriority(1); + + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0); + assertEquals(1*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + //Now, headroom is the same for all apps for a given user + queue combo + //and a change to any app's headroom is reflected for all the user's apps + //once those apps are active/have themselves calculated headroom for allocation + //at least one time + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active + assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_0.getHeadroom().getMemory()); + assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active + assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active + + //Complete container and verify that headroom is updated, for both apps for the user + RMContainer rmContainer = app_0.getLiveContainers().iterator().next(); + a.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null); + + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(2*GB, app_1.getHeadroom().getMemory()); + } + + @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));