diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index aa9be4b..3ba0b2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") @Private public class ApplicationMasterService extends AbstractService implements @@ -91,8 +93,8 @@ private Server server; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ConcurrentMap responseMap = - new ConcurrentHashMap(); + private final ConcurrentMap responseMap = + new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; @@ -141,7 +143,9 @@ public InetSocketAddress getBindAddress() { return this.bindAddress; } - private void authorizeRequest(ApplicationAttemptId appAttemptID) + @Private + @VisibleForTesting + protected void authorizeRequest(ApplicationAttemptId appAttemptID) throws YarnException { if (!UserGroupInformation.isSecurityEnabled()) { @@ -174,26 +178,40 @@ private void authorizeRequest(ApplicationAttemptId appAttemptID) public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - + ApplicationAttemptId applicationAttemptId = request .getApplicationAttemptId(); authorizeRequest(applicationAttemptId); - + ApplicationId appID = applicationAttemptId.getApplicationId(); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { + AllocateResponseWrapper res = responseMap.get(applicationAttemptId); + if (res == null) { String message = "Application doesn't exist in cache " + applicationAttemptId; - LOG.error(message); - RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(), + RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID) + .getUser(), AuditConstants.REGISTER_AM, message, "ApplicationMasterService", "Error in registering application master", appID, applicationAttemptId); - throw RPCUtil.getRemoteException(message); + throwApplicationAttemptDoesNotExistInCacheException( + applicationAttemptId); } + // Allow only one thread in AM to do registerApp at a time. - synchronized (lastResponse) { + synchronized (res) { + AllocateResponse lastResponse = res.getAllocateResponse(); + if (lastResponse == null) { + String message = "Application doesn't exist in cache " + + applicationAttemptId; + RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID) + .getUser(), + AuditConstants.REGISTER_AM, message, "ApplicationMasterService", + "Error in registering application master", appID, + applicationAttemptId); + throwApplicationAttemptDoesNotExistInCacheException( + applicationAttemptId); + } if (hasApplicationMasterRegistered(applicationAttemptId)) { String message = @@ -214,7 +232,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Setting the response id to 0 to identify if the // application master is register for the respective attemptid lastResponse.setResponseId(0); - responseMap.put(applicationAttemptId, lastResponse); + res.setAllocateResponse(lastResponse); LOG.info("AM registration " + applicationAttemptId); this.rmContext .getDispatcher() @@ -251,17 +269,17 @@ public FinishApplicationMasterResponse finishApplicationMaster( .getApplicationAttemptId(); authorizeRequest(applicationAttemptId); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { - String message = "Application doesn't exist in cache " - + applicationAttemptId; - LOG.error(message); - throw RPCUtil.getRemoteException(message); + AllocateResponseWrapper res = responseMap.get(applicationAttemptId); + if (res == null) { + throwApplicationAttemptDoesNotExistInCacheException(applicationAttemptId); } // Allow only one thread in AM to do finishApp at a time. - synchronized (lastResponse) { - + synchronized (res) { + if (res.getAllocateResponse() == null) { + throwApplicationAttemptDoesNotExistInCacheException(applicationAttemptId); + } + this.amLivelinessMonitor.receivedPing(applicationAttemptId); rmContext.getDispatcher().getEventHandler().handle( @@ -275,6 +293,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } + private void throwApplicationAttemptDoesNotExistInCacheException( + ApplicationAttemptId appAttemptId) throws YarnException { + String message = "Application doesn't exist in cache " + + appAttemptId; + LOG.error(message); + throw RPCUtil.getRemoteException(message); + } + /** * @param appAttemptId * @return true if application is registered for the respective attemptid @@ -282,10 +308,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( public boolean hasApplicationMasterRegistered( ApplicationAttemptId appAttemptId) { boolean hasApplicationMasterRegistered = false; - AllocateResponse lastResponse = responseMap.get(appAttemptId); + AllocateResponseWrapper lastResponse = responseMap.get(appAttemptId); if (lastResponse != null) { synchronized (lastResponse) { - if (lastResponse.getResponseId() >= 0) { + if (lastResponse.getAllocateResponse() != null + && lastResponse.getAllocateResponse().getResponseId() >= 0) { hasApplicationMasterRegistered = true; } } @@ -303,38 +330,42 @@ public AllocateResponse allocate(AllocateRequest request) this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ - AllocateResponse lastResponse = responseMap.get(appAttemptId); - if (lastResponse == null) { + AllocateResponseWrapper res = responseMap.get(appAttemptId); + if (res == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); return resync; } - - if (!hasApplicationMasterRegistered(appAttemptId)) { - String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); - } + synchronized (res) { + AllocateResponse lastResponse = res.getAllocateResponse(); + if (lastResponse == null) { + LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); + return resync; + } + if (!hasApplicationMasterRegistered(appAttemptId)) { + String message = + "Application Master is trying to allocate before registering for: " + + appAttemptId.getApplicationId(); + LOG.error(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getUser(), AuditConstants.REGISTER_AM, "", + "ApplicationMasterService", message, + appAttemptId.getApplicationId(), + appAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ - return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register and - // get an exception. Might as well throw an exception here. - return resync; - } - - // Allow only one thread in AM to do heartbeat at a time. - synchronized (lastResponse) { + if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { + /* old heartbeat */ + return lastResponse; + } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + LOG.error("Invalid responseid from appAttemptId " + appAttemptId); + // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: + // Reboot is not useful since after AM reboots, it will send register + // and + // get an exception. Might as well throw an exception here. + return resync; + } // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( @@ -343,15 +374,16 @@ public AllocateResponse allocate(AllocateRequest request) List ask = request.getAskList(); List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? blacklistRequest.getBlacklistAdditions() : null; - List blacklistRemovals = - (blacklistRequest != null) ? + List blacklistRemovals = + (blacklistRequest != null) ? blacklistRequest.getBlacklistRemovals() : null; - + // sanity check try { SchedulerUtils.validateResourceRequests(ask, @@ -360,32 +392,32 @@ public AllocateResponse allocate(AllocateRequest request) LOG.warn("Invalid resource ask by application " + appAttemptId, e); throw e; } - + try { SchedulerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { + } catch (InvalidResourceBlacklistRequestException e) { LOG.warn("Invalid blacklist request by application " + appAttemptId, e); throw e; } - + // Send new requests to appAttempt. Allocation allocation = - this.rScheduler.allocate(appAttemptId, ask, release, + this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals); RMApp app = this.rmContext.getRMApps().get( appAttemptId.getApplicationId()); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - + AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); // update the response with the deltas of node status changes List updatedNodes = new ArrayList(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { + if (app.pullRMNodeUpdates(updatedNodes) > 0) { List updatedNodeReports = new ArrayList(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = + for (RMNode rmNode : updatedNodes) { + SchedulerNodeReport schedulerNodeReport = rScheduler.getNodeReport(rmNode.getNodeID()); Resource used = BuilderUtils.newResource(0, 0); int numContainers = 0; @@ -399,7 +431,7 @@ public AllocateResponse allocate(AllocateRequest request) rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime()); - + updatedNodeReports.add(report); } allocateResponse.setUpdatedNodes(updatedNodeReports); @@ -410,11 +442,12 @@ public AllocateResponse allocate(AllocateRequest request) .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); - + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - + // add preemption to the allocateResponse message (if any) - allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); // Adding NMTokens for allocated containers. if (!allocation.getContainers().isEmpty()) { @@ -422,21 +455,9 @@ public AllocateResponse allocate(AllocateRequest request) .createAndGetNMTokens(app.getUser(), appAttemptId, allocation.getContainers())); } - - // before returning response, verify in sync - AllocateResponse oldResponse = - responseMap.put(appAttemptId, allocateResponse); - if (oldResponse == null) { - // appAttempt got unregistered, remove it back out - responseMap.remove(appAttemptId); - String message = "App Attempt removed from the cache during allocate" - + appAttemptId; - LOG.error(message); - return resync; - } - + res.setAllocateResponse(allocateResponse); return allocateResponse; - } + } } private PreemptionMessage generatePreemptionMessage(Allocation allocation){ @@ -498,7 +519,7 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { // attemptID get registered response.setResponseId(-1); LOG.info("Registering app attempt : " + attemptId); - responseMap.put(attemptId, response); + responseMap.put(attemptId, new AllocateResponseWrapper(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } @@ -520,4 +541,20 @@ protected void serviceStop() throws Exception { } super.serviceStop(); } -} + + public static class AllocateResponseWrapper { + private AllocateResponse response; + + public AllocateResponseWrapper(AllocateResponse response) { + this.response = response; + } + + public synchronized AllocateResponse getAllocateResponse() { + return response; + } + + public synchronized void setAllocateResponse(AllocateResponse response) { + this.response = response; + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java new file mode 100644 index 0000000..2707897 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.Test; + +public class TestApplicationMasterService { + + @Test (timeout = 60000) + public void testConcurrentAllocateRequest() throws Exception { + final AtomicInteger count = new AtomicInteger(0); + MockRM rm = new MockRM() { + + protected ApplicationMasterService createApplicationMasterService() { + return new ApplicationMasterService(getRMContext(), + getResourceScheduler()) { + + @Override + protected void authorizeRequest(ApplicationAttemptId appAttemptID) + throws YarnException { + int interval = 10; + count.incrementAndGet(); + while (count.get() == 1 && interval-- > 0 ) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + Assert.assertTrue(count.get() > 1); + } + }; + }; + }; + rm.start(); + + ApplicationMasterService amService = rm.getApplicationMasterService(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + // Trigger the scheduling so the AM gets 'launched' + nm1.nodeHeartbeat(true); + + // temporarily allowing single allocate request to go through so that we + // can get responseId. + count.set(1); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + + am.registerAppAttempt(); + + AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt + .getAppAttemptId(), 0, 0F, null, null, null); + + AllocateResponse response = amService.allocate(allocateRequest); + Assert.assertEquals(1, response.getResponseId()); + + allocateRequest = AllocateRequest.newInstance(attempt + .getAppAttemptId(), response.getResponseId(), 0F, null, null, null); + + // Now we will set it back to 0 so that we can block 2 threads sending + // identical allocate request. + + count.set(0); + Queue responseQ = + new LinkedBlockingDeque(); + Thread requestor1 = + new Thread(new Requestor(responseQ, allocateRequest, amService)); + requestor1.start(); + int interval = 10; + while (count.get() < 1 && interval-- > 0) { + Thread.sleep(1000); + } + Assert.assertTrue(count.get() == 1); + + Thread requestor2 = + new Thread(new Requestor(responseQ, allocateRequest, amService)); + requestor2.start(); + requestor1.join(); + requestor2.join(); + interval = 10; + while (responseQ.size() < 2 && interval-- > 0) { + Thread.sleep(1000); + } + Assert.assertTrue(responseQ.size() == 2); + AllocateResponse response1 = responseQ.poll(); + AllocateResponse response2 = responseQ.poll(); + Assert + .assertEquals(response.getResponseId() + 1, response1.getResponseId()); + Assert + .assertEquals(response.getResponseId() + 1, response2.getResponseId()); + } + + class Requestor implements Runnable { + + private Queue responseQ; + private ApplicationMasterService amService; + private AllocateRequest request; + public Requestor(Queue responseQ, + AllocateRequest request, ApplicationMasterService amService) { + this.responseQ = responseQ; + this.request = request; + this.amService = amService; + } + + @Override + public void run() { + try { + responseQ.add(amService.allocate(request)); + } catch (Exception e) { + Assert.fail("No exception was expected."); + } + } + } + +}