diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 42b0aa6..1e268b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -149,7 +149,7 @@ private YarnRPC rpc; // Handle to communicate with the Resource Manager - private AMRMClientAsync resourceManager; + private AMRMClientAsync resourceManager; // Application Attempt Id ( combination of attemptId and fail count ) private ApplicationAttemptId appAttemptID; @@ -440,7 +440,9 @@ public boolean run() throws YarnRemoteException, IOException { AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); - resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener); + resourceManager = new AMRMClientAsync(appAttemptID, + 1000, + allocListener); resourceManager.init(conf); resourceManager.start(); @@ -592,11 +594,6 @@ public void onContainersCompleted(List completedContainers) { resourceManager.addContainerRequest(containerAsk); } - // set progress to deliver to RM on next heartbeat - float progress = (float) numCompletedContainers.get() - / numTotalContainers; - resourceManager.setProgress(progress); - if (numCompletedContainers.get() == numTotalContainers) { done = true; } @@ -635,6 +632,18 @@ public void onRebootRequest() {} @Override public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + // set progress to deliver to RM on next heartbeat + float progress = (float) numCompletedContainers.get() + / numTotalContainers; + return progress; + } + + @Override + public void onError(Exception e) { + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 9bc8c5f..81147ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -18,8 +18,8 @@ package org.apache.hadoop.yarn.client; - import java.io.IOException; +import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -34,7 +34,7 @@ @InterfaceAudience.Public @InterfaceStability.Unstable -public interface AMRMClient extends Service { +public interface AMRMClient extends Service { /** * Object to represent container request for resources. @@ -57,6 +57,7 @@ public ContainerRequest(Resource capability, String[] hosts, this.priority = priority; this.containerCount = containerCount; } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); @@ -66,6 +67,23 @@ public String toString() { } } + public static class StoredContainerRequest extends ContainerRequest { + T cookie; + + public StoredContainerRequest(Resource capability, String[] hosts, + String[] racks, Priority priority) { + super(capability, hosts, racks, priority, 1); + } + + void setCookie(T cookie) { + this.cookie = cookie; + } + + T getCookie() { + return cookie; + } + } + /** * Register the application master. This must be called before any * other interaction @@ -117,7 +135,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, * Request containers for resources before calling allocate * @param req Resource request */ - public void addContainerRequest(ContainerRequest req); + public void addContainerRequest(T req); /** * Remove previous container request. The previous container request may have @@ -126,7 +144,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, * even after the remove request * @param req Resource request */ - public void removeContainerRequest(ContainerRequest req); + public void removeContainerRequest(T req); /** * Release containers assigned by the Resource Manager. If the app cannot use @@ -150,4 +168,15 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, * @return Current number of nodes in the cluster */ public int getClusterNodeCount(); + + /** + * Get outstanding StoredContainerRequests matching the given + * parameters. These StoredContainerRequests should have been added via + * addContainerRequest earlier in the lifecycle. + */ + public Collection getMatchingRequests( + Priority priority, + String resourceName, + Resource capability); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java index 649c4ea..a38be81 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -38,7 +39,9 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.service.AbstractService; @@ -88,11 +91,11 @@ */ @Unstable @Evolving -public class AMRMClientAsync extends AbstractService { +public class AMRMClientAsync extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); - private final AMRMClient client; + private final AMRMClient client; private final int intervalMs; private final HeartbeatThread heartbeatThread; private final CallbackHandlerThread handlerThread; @@ -103,14 +106,16 @@ private volatile boolean keepRunning; private volatile float progress; + private volatile Exception savedException; + public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, CallbackHandler callbackHandler) { - this(new AMRMClientImpl(id), intervalMs, callbackHandler); + this(new AMRMClientImpl(id), intervalMs, callbackHandler); } @Private @VisibleForTesting - AMRMClientAsync(AMRMClient client, int intervalMs, + public AMRMClientAsync(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { super(AMRMClientAsync.class.getName()); this.client = client; @@ -120,23 +125,14 @@ public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, handlerThread = new CallbackHandlerThread(); responseQueue = new LinkedBlockingQueue(); keepRunning = true; + savedException = null; } - - /** - * Sets the application's current progress. It will be transmitted to the - * resource manager on the next heartbeat. - * @param progress - * the application's progress so far - */ - public void setProgress(float progress) { - this.progress = progress; - } - + @Override public void init(Configuration conf) { super.init(conf); client.init(conf); - } + } @Override public void start() { @@ -171,6 +167,13 @@ public void stop() { super.stop(); } + public Collection getMatchingRequests( + Priority priority, + String resourceName, + Resource capability) { + return client.getMatchingRequests(priority, resourceName, capability); + } + /** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. @@ -206,7 +209,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, * Request containers for resources before calling allocate * @param req Resource request */ - public void addContainerRequest(AMRMClient.ContainerRequest req) { + public void addContainerRequest(T req) { client.addContainerRequest(req); } @@ -217,7 +220,7 @@ public void addContainerRequest(AMRMClient.ContainerRequest req) { * even after the remove request * @param req Resource request */ - public void removeContainerRequest(AMRMClient.ContainerRequest req) { + public void removeContainerRequest(T req) { client.removeContainerRequest(req); } @@ -267,9 +270,17 @@ public void run() { try { response = client.allocate(progress); } catch (YarnRemoteException ex) { - LOG.error("Failed to heartbeat", ex); + LOG.error("Yarn exception on heartbeat", ex); + savedException = ex; + // interrupt handler thread in case it waiting on the queue + handlerThread.interrupt(); + break; } catch (IOException e) { - LOG.error("Failed to heartbeat", e); + LOG.error("IO exception on heartbeat", e); + savedException = e; + // interrupt handler thread in case it waiting on the queue + handlerThread.interrupt(); + break; } } if (response != null) { @@ -301,14 +312,21 @@ public void run() { while (keepRunning) { AllocateResponse response; try { + if(savedException != null) { + LOG.error("Stopping callback due to: ", savedException); + handler.onError(savedException); + break; + } response = responseQueue.take(); } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting for queue"); + LOG.info("Interrupted while waiting for queue", ex); continue; } if (response.getReboot()) { handler.onRebootRequest(); + LOG.info("Reboot requested. Stopping callback."); + break; } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { @@ -325,6 +343,8 @@ public void run() { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } + + progress = handler.getProgress(); } } } @@ -347,14 +367,19 @@ public void run() { /** * Called when the ResourceManager wants the ApplicationMaster to reboot - * for being out of sync. + * for being out of sync. The ApplicationMaster should not unregister with + * the RM unless the ApplicationMaster wants to be the last attempt. */ public void onRebootRequest(); /** - * Called when nodes tracked by the ResourceManager have changed in in health, + * Called when nodes tracked by the ResourceManager have changed in health, * availability etc. */ public void onNodesUpdated(List updatedNodes); + + public float getProgress(); + + public void onError(Exception e); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 350a1cb..4d5d984 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -22,7 +22,9 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -55,8 +58,11 @@ import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; +// TODO check inputs for null etc. YARN-654 + @Unstable -public class AMRMClientImpl extends AbstractService implements AMRMClient { +public class AMRMClientImpl + extends AbstractService implements AMRMClient { private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); @@ -70,6 +76,17 @@ protected Resource clusterAvailableResources; protected int clusterNodeCount; + class ResourceRequestInfo { + ResourceRequest remoteRequest; + HashSet containerRequests; + + ResourceRequestInfo(Priority priority, String resourceName, Resource capability) { + remoteRequest = BuilderUtils. + newResourceRequest(priority, resourceName, capability, 0); + containerRequests = new HashSet(); + } + } + //Key -> Priority //Value -> Map //Key->ResourceName (e.g., hostname, rackname, *) @@ -77,9 +94,9 @@ //Key->Resource Capability //Value->ResourceRequest protected final - Map>> + Map>> remoteRequestsTable = - new TreeMap>>(); + new TreeMap>>(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator()); @@ -223,42 +240,42 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, } @Override - public synchronized void addContainerRequest(ContainerRequest req) { + public synchronized void addContainerRequest(T req) { // Create resource requests + // add check for dup locations if(req.hosts != null) { for (String host : req.hosts) { - addResourceRequest(req.priority, host, req.capability, req.containerCount); + addResourceRequest(req.priority, host, req.capability, req.containerCount, req); } } if(req.racks != null) { for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability, req.containerCount); + addResourceRequest(req.priority, rack, req.capability, req.containerCount, req); } } // Off-switch - addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount); + addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req); } @Override - public synchronized void removeContainerRequest(ContainerRequest req) { + public synchronized void removeContainerRequest(T req) { // Update resource requests if(req.hosts != null) { for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability, req.containerCount); + decResourceRequest(req.priority, hostName, req.capability, req.containerCount, req); } } if(req.racks != null) { for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability, req.containerCount); + decResourceRequest(req.priority, rack, req.capability, req.containerCount, req); } } - decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount); + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.containerCount, req); + } @Override @@ -276,6 +293,29 @@ public synchronized int getClusterNodeCount() { return clusterNodeCount; } + @Override + public synchronized Collection getMatchingRequests( + Priority priority, + String resourceName, + Resource capability) { + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + if (remoteRequests == null) { + return null; + } + Map reqMap = remoteRequests + .get(resourceName); + if (reqMap == null) { + return null; + } + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); + if (resourceRequestInfo == null) { + return null; + } + + return resourceRequestInfo.containerRequests; + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container @@ -294,44 +334,53 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { } private void addResourceRequest(Priority priority, String resourceName, - Resource capability, int containerCount) { - Map> remoteRequests = + Resource capability, int containerCount, T req) { + Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { - remoteRequests = new HashMap>(); + remoteRequests = new HashMap>(); this.remoteRequestsTable.put(priority, remoteRequests); if (LOG.isDebugEnabled()) { LOG.debug("Added priority=" + priority); } } - Map reqMap = remoteRequests.get(resourceName); + Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { - reqMap = new HashMap(); + reqMap = new HashMap(); remoteRequests.put(resourceName, reqMap); } - ResourceRequest remoteRequest = reqMap.get(capability); - if (remoteRequest == null) { - remoteRequest = BuilderUtils. - newResourceRequest(priority, resourceName, capability, 0); - reqMap.put(capability, remoteRequest); + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); + if (resourceRequestInfo == null) { + resourceRequestInfo = + new ResourceRequestInfo(priority, resourceName, capability); + reqMap.put(capability, resourceRequestInfo); } - remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount); + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() + containerCount); + + if(req instanceof StoredContainerRequest) { + resourceRequestInfo.containerRequests.add(req); + } // Note this down for next interaction with ResourceManager - addResourceRequestToAsk(remoteRequest); + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); if (LOG.isDebugEnabled()) { LOG.debug("addResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } } - private void decResourceRequest(Priority priority, String resourceName, - Resource capability, int containerCount) { - Map> remoteRequests = + private void decResourceRequest(Priority priority, + String resourceName, + Resource capability, + int containerCount, + T req) { + Map> remoteRequests = this.remoteRequestsTable.get(priority); if(remoteRequests == null) { @@ -342,7 +391,7 @@ private void decResourceRequest(Priority priority, String resourceName, return; } - Map reqMap = remoteRequests.get(resourceName); + Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("Not decrementing resource as " + resourceName @@ -350,28 +399,34 @@ private void decResourceRequest(Priority priority, String resourceName, } return; } - ResourceRequest remoteRequest = reqMap.get(capability); + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); if (LOG.isDebugEnabled()) { LOG.debug("BEFORE decResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } - remoteRequest. - setNumContainers(remoteRequest.getNumContainers() - containerCount); - if(remoteRequest.getNumContainers() < 0) { + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() - containerCount); + + if(req instanceof StoredContainerRequest) { + resourceRequestInfo.containerRequests.remove(req); + } + + if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) { // guard against spurious removals - remoteRequest.setNumContainers(0); + resourceRequestInfo.remoteRequest.setNumContainers(0); } // send the ResourceRequest to RM even if is 0 because it needs to override // a previously sent value. If ResourceRequest was not sent previously then // sending 0 aught to be a no-op on RM - addResourceRequestToAsk(remoteRequest); + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); // delete entries from map if no longer needed - if (remoteRequest.getNumContainers() == 0) { + if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { reqMap.remove(capability); if (reqMap.size() == 0) { remoteRequests.remove(resourceName); @@ -385,7 +440,8 @@ private void decResourceRequest(Priority priority, String resourceName, LOG.info("AFTER decResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 2764061..862116f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -50,27 +51,37 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestAMRMClient { - Configuration conf = null; - MiniYARNCluster yarnCluster = null; - YarnClientImpl yarnClient = null; - List nodeReports = null; - ApplicationAttemptId attemptId = null; - int nodeCount = 3; + static Configuration conf = null; + static MiniYARNCluster yarnCluster = null; + static YarnClientImpl yarnClient = null; + static List nodeReports = null; + static ApplicationAttemptId attemptId = null; + static int nodeCount = 3; - @Before - public void setup() throws YarnRemoteException, IOException { + static Resource capability = Records.newRecord(Resource.class); + static Priority priority = Records.newRecord(Priority.class); + static String node; + static String rack; + static String[] nodes; + static String[] racks; + + @BeforeClass + public static void setup() throws Exception { // start minicluster conf = new YarnConfiguration(); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); @@ -84,7 +95,17 @@ public void setup() throws YarnRemoteException, IOException { // get node info nodeReports = yarnClient.getNodeReports(); - + + priority.setPriority(1); + capability.setMemory(1024); + node = nodeReports.get(0).getNodeId().getHost(); + rack = nodeReports.get(0).getRackName(); + nodes = new String[]{ node }; + racks = new String[]{ rack }; + } + + @Before + public void startApp() throws Exception { // submit new app GetNewApplicationResponse newApp = yarnClient.getNewApplication(); ApplicationId appId = newApp.getApplicationId(); @@ -125,7 +146,12 @@ public void setup() throws YarnRemoteException, IOException { } @After - public void tearDown() { + public void cancelApp() { + attemptId = null; + } + + @AfterClass + public static void tearDown() { if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { yarnClient.stop(); } @@ -135,11 +161,73 @@ public void tearDown() { } @Test (timeout=60000) + public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException { + AMRMClientImpl> amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl>(attemptId); + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); + + StoredContainerRequest storedContainer1 = + new StoredContainerRequest(capability, nodes, racks, priority); + StoredContainerRequest storedContainer2 = + new StoredContainerRequest(capability, nodes, racks, priority); + StoredContainerRequest storedContainer3 = + new StoredContainerRequest(capability, nodes, racks, priority); + Integer cookie = new Integer(1); + storedContainer1.setCookie(cookie); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + amClient.addContainerRequest(storedContainer3); + + int containersRequestedAny = amClient.remoteRequestsTable.get(priority) + .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + assertTrue(containersRequestedAny == 3); + Collection> matches = + amClient.getMatchingRequests(priority, node, capability); + assertTrue(matches.size() == 3); + matches = amClient.getMatchingRequests(priority, rack, capability); + assertTrue(matches.size() == 3); + matches = + amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); + assertTrue(matches.size() == 3); + amClient.removeContainerRequest(storedContainer3); + matches = + amClient.getMatchingRequests(priority, node, capability); + assertTrue(matches.size() == 2); + amClient.removeContainerRequest(storedContainer2); + matches = + amClient.getMatchingRequests(priority, rack, capability); + assertTrue(matches.size() == 1); + StoredContainerRequest storedRequest = + (StoredContainerRequest) matches.iterator().next(); + assertTrue(storedContainer1 == storedRequest); + assertTrue(cookie == storedRequest.getCookie()); + amClient.removeContainerRequest(storedContainer1); + matches = + amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); + assertTrue(matches == null); // 0 requests left. everything got cleaned up + assertTrue(amClient.remoteRequestsTable.isEmpty()); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + @Test (timeout=60000) public void testAMRMClient() throws YarnRemoteException, IOException { - AMRMClientImpl amClient = null; + AMRMClientImpl amClient = null; try { // start am rm client - amClient = new AMRMClientImpl(attemptId); + amClient = new AMRMClientImpl(attemptId); amClient.init(conf); amClient.start(); @@ -156,36 +244,27 @@ public void testAMRMClient() throws YarnRemoteException, IOException { } } } - - - private void testAllocation(final AMRMClientImpl amClient) + + private void testAllocation(final AMRMClientImpl amClient) throws YarnRemoteException, IOException { // setup container request - final Resource capability = Records.newRecord(Resource.class); - final Priority priority = Records.newRecord(Priority.class); - priority.setPriority(0); - capability.setMemory(1024); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - final String[] nodes = { node }; - final String[] racks = { rack }; assertTrue(amClient.ask.size() == 0); assertTrue(amClient.release.size() == 0); - amClient.addContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 1)); - amClient.addContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 3)); - amClient.removeContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 2)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 1)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 3)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); int containersRequestedNode = amClient.remoteRequestsTable.get(priority) - .get(node).get(capability).getNumContainers(); + .get(node).get(capability).remoteRequest.getNumContainers(); int containersRequestedRack = amClient.remoteRequestsTable.get(priority) - .get(rack).get(capability).getNumContainers(); + .get(rack).get(capability).remoteRequest.getNumContainers(); int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).getNumContainers(); + .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); assertTrue(containersRequestedNode == 2); assertTrue(containersRequestedRack == 2); @@ -221,8 +300,8 @@ private void testAllocation(final AMRMClientImpl amClient) assertTrue(amClient.ask.size() == 0); // need to tell the AMRMClient that we dont need these resources anymore - amClient.removeContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 2)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); assertTrue(amClient.ask.size() == 3); // send 0 container count request for resources that are no longer needed ResourceRequest snoopRequest = amClient.ask.iterator().next(); @@ -241,8 +320,9 @@ private void testAllocation(final AMRMClientImpl amClient) new Answer() { public AllocateResponse answer(InvocationOnMock invocation) throws Exception { - amClient.removeContainerRequest(new ContainerRequest(capability, - nodes, racks, priority, 2)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, + racks, priority, 2)); throw new Exception(); } }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index ff2c0a4..1037936 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -48,6 +50,7 @@ private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class); + @SuppressWarnings("unchecked") @Test(timeout=10000) public void testAMRMClientAsync() throws Exception { Configuration conf = new Configuration(); @@ -65,7 +68,7 @@ public void testAMRMClientAsync() throws Exception { new ArrayList(), new ArrayList()); TestCallbackHandler callbackHandler = new TestCallbackHandler(); - AMRMClient client = mock(AMRMClient.class); + AMRMClient client = mock(AMRMClientImpl.class); final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false); when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer() { @Override @@ -78,7 +81,8 @@ public AllocateResponse answer(InvocationOnMock invocation) when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); - AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler); + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); asyncClient.registerApplicationMaster("localhost", 1234, null); @@ -110,6 +114,73 @@ public AllocateResponse answer(InvocationOnMock invocation) Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); } + @Test(timeout=10000) + public void testAMRMClientAsyncException() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + String exStr = "TestException"; + YarnRemoteException mockException = mock(YarnRemoteException.class); + when(mockException.getMessage()).thenReturn(exStr); + when(client.allocate(anyFloat())).thenThrow(mockException); + + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.savedException == null) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr)); + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + } + + @Test(timeout=10000) + public void testAMRMClientAsyncReboot() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + final AllocateResponse rebootResponse = createAllocateResponse( + new ArrayList(), new ArrayList()); + rebootResponse.setReboot(true); + when(client.allocate(anyFloat())).thenReturn(rebootResponse); + + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.reboot == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + } + private AllocateResponse createAllocateResponse( List completed, List allocated) { AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated, @@ -120,6 +191,11 @@ private AllocateResponse createAllocateResponse( private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; + Exception savedException = null; + boolean reboot = false; + Object notifier = new Object(); + + int callbackCount = 0; public List takeCompletedContainers() { List ret = completedContainers; @@ -176,9 +252,28 @@ public void onContainersAllocated(List containers) { } @Override - public void onRebootRequest() {} + public void onRebootRequest() { + reboot = true; + synchronized (notifier) { + notifier.notifyAll(); + } + } @Override public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + callbackCount++; + return 0.5f; + } + + @Override + public void onError(Exception e) { + savedException = e; + synchronized (notifier) { + notifier.notifyAll(); + } + } } }