diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index b8ef2637474..07bcd7cbd4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -121,7 +122,8 @@ private void setupConfForCleanup(Configuration conf) { public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, Set requestPriorities, Set allocationRequestIds, - RMWSConsts.ActivitiesGroupBy groupBy) { + RMWSConsts.ActivitiesGroupBy groupBy, int limit, boolean summarize, + double maxTimeInSeconds) { RMApp app = rmContext.getRMApps().get(applicationId); if (app != null && app.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) { @@ -140,6 +142,17 @@ public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, allocations = new ArrayList(curAllocations); } } + if (summarize && allocations != null) { + AppAllocation summaryAppAllocation = + getSummarizedAppAllocation(allocations, maxTimeInSeconds); + if (summaryAppAllocation != null) { + allocations = Lists.newArrayList(summaryAppAllocation); + } + } + if (allocations != null && limit > 0 && limit < allocations.size()) { + allocations = + allocations.subList(allocations.size() - limit, allocations.size()); + } return new AppActivitiesInfo(allocations, applicationId, groupBy); } else { return new AppActivitiesInfo( @@ -148,6 +161,47 @@ public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, } } + /** + * Get summarized app allocation from multiple allocations as follows: + * 1. Collect latest allocation attempts on nodes to construct an allocation + * summary on nodes from multiple app allocations which are recorded a few + * seconds before the last allocation. + * 2. Copy other fields from the last allocation. + */ + private AppAllocation getSummarizedAppAllocation( + List allocations, double maxTimeInSeconds) { + if (allocations == null || allocations.isEmpty()) { + return null; + } + long startTime = allocations.get(allocations.size() - 1).getTime() + - (long) (maxTimeInSeconds * 1000); + Map nodeActivities = new HashMap<>(); + for (int i = allocations.size() - 1; i >= 0; i--) { + AppAllocation appAllocation = allocations.get(i); + if (startTime > appAllocation.getTime()) { + break; + } + List activityNodes = appAllocation.getAllocationAttempts(); + for (ActivityNode an : activityNodes) { + if (an.getNodeId() != null) { + nodeActivities.putIfAbsent( + an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_" + + an.getNodeId(), an); + } + } + } + AppAllocation lastAppAllocation = allocations.get(allocations.size() - 1); + AppAllocation summarizedAppAllocation = + new AppAllocation(lastAppAllocation.getPriority(), null, + lastAppAllocation.getQueueName()); + summarizedAppAllocation + .updateAppContainerStateAndTime(null, lastAppAllocation.getAppState(), + lastAppAllocation.getTime(), lastAppAllocation.getDiagnostic()); + summarizedAppAllocation + .setAllocationAttempts(new ArrayList<>(nodeActivities.values())); + return summarizedAppAllocation; + } + public ActivitiesInfo getActivitiesInfo(String nodeId, RMWSConsts.ActivitiesGroupBy groupBy) { List allocations; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java index 69d6ccf218b..e226b50fb77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -84,11 +84,8 @@ public ActivityState getAppState() { return appState; } - public String getPriority() { - if (priority == null) { - return null; - } - return priority.toString(); + public Priority getPriority() { + return priority; } public String getContainerId() { @@ -128,4 +125,8 @@ public AppAllocation filterAllocationAttempts(Set requestPriorities, .collect(Collectors.toList()); return appAllocation; } + + public void setAllocationAttempts(List allocationAttempts) { + this.allocationAttempts = allocationAttempts; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index b7a60087e64..2a07ca86056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -237,6 +237,8 @@ public static final String GROUP_BY = "groupBy"; public static final String SIGNAL = "signal"; public static final String COMMAND = "command"; + public static final String ACTIONS = "actions"; + public static final String SUMMARIZE = "summarize"; private RMWSConsts() { // not called @@ -250,4 +252,13 @@ private RMWSConsts() { public enum ActivitiesGroupBy { DIAGNOSTIC } + + /** + * Defines the required action of app activities: + * UPDATE means app activities need to be updated, + * GET means the required app activities should be involved in response. + */ + public enum AppActivitiesRequiredAction { + UPDATE, GET + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java index 3aa2593c1c2..a5bd93bbbed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java @@ -227,11 +227,16 @@ ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, * the activities. It is a QueryParam. * @param groupBy the groupBy type by which the activities should be * aggregated. It is a QueryParam. + * @param limit set a limit of the result. It is a QueryParam. + * @param actions the required actions of app activities. It is a QueryParam. + * @param summarize whether app activities in multiple scheduling processes + * need to be summarized. It is a QueryParam. * @return all the activities about a specific app for a specific time */ AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy); + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize); /** * This method retrieves all the statistics for a specific app, and it is diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 3f010350cb5..3cff6dc2a70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -236,6 +236,7 @@ public static final String DEFAULT_START_TIME = "0"; public static final String DEFAULT_END_TIME = "-1"; public static final String DEFAULT_INCLUDE_RESOURCE = "false"; + public static final String DEFAULT_SUMMARIZE = "false"; @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; @@ -722,7 +723,11 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set requestPriorities, @QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS) Set allocationRequestIds, - @QueryParam(RMWSConsts.GROUP_BY) String groupBy) { + @QueryParam(RMWSConsts.GROUP_BY) String groupBy, + @QueryParam(RMWSConsts.LIMIT) String limit, + @QueryParam(RMWSConsts.ACTIONS) Set actions, + @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE) + boolean summarize) { initForReadableEndpoints(); YarnScheduler scheduler = rm.getRMContext().getScheduler(); @@ -749,6 +754,26 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, return new AppActivitiesInfo(e.getMessage(), appId); } + Set requiredActions; + try { + requiredActions = parseAppActivitiesRequiredActions(actions); + } catch (IllegalArgumentException e) { + return new AppActivitiesInfo(e.getMessage(), appId); + } + + int limitNum = -1; + if (limit != null) { + try { + limitNum = Integer.parseInt(limit); + if (limitNum <= 0) { + return new AppActivitiesInfo( + "limit must be greater than 0!", appId); + } + } catch (NumberFormatException e) { + return new AppActivitiesInfo("limit must be integer!", appId); + } + } + double maxTime = 3.0; if (time != null) { @@ -762,12 +787,21 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, ApplicationId applicationId; try { applicationId = ApplicationId.fromString(appId); - activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime); - AppActivitiesInfo appActivitiesInfo = - activitiesManager.getAppActivitiesInfo(applicationId, - requestPriorities, allocationRequestIds, activitiesGroupBy); - - return appActivitiesInfo; + if (requiredActions + .contains(RMWSConsts.AppActivitiesRequiredAction.UPDATE)) { + activitiesManager + .turnOnAppActivitiesRecording(applicationId, maxTime); + } + if (requiredActions + .contains(RMWSConsts.AppActivitiesRequiredAction.GET)) { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(applicationId, requestPriorities, + allocationRequestIds, activitiesGroupBy, limitNum, + summarize, maxTime); + return appActivitiesInfo; + } + return new AppActivitiesInfo("Successfully notified actions: " + + StringUtils.join(',', actions), appId); } catch (Exception e) { String errMessage = "Cannot find application with given appId"; LOG.error(errMessage, e); @@ -778,6 +812,29 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, return null; } + private Set + parseAppActivitiesRequiredActions(Set actions) { + Set requiredActions = + new HashSet<>(); + if (actions == null || actions.isEmpty()) { + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.UPDATE); + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.GET); + } else { + for (String action : actions) { + if (!EnumUtils.isValidEnum(RMWSConsts.AppActivitiesRequiredAction.class, + action.toUpperCase())) { + String errMesasge = + "Got invalid action: " + action + ", valid actions: " + Arrays + .asList(RMWSConsts.AppActivitiesRequiredAction.values()); + throw new IllegalArgumentException(errMesasge); + } + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction + .valueOf(action.toUpperCase())); + } + } + return requiredActions; + } + private RMWSConsts.ActivitiesGroupBy parseActivitiesGroupBy(String groupBy) { if (groupBy != null) { if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java index da2be57184e..d84bc8ba742 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -53,7 +53,8 @@ this.requestAllocation = new ArrayList<>(); this.nodeId = allocation.getNodeId(); this.queueName = allocation.getQueueName(); - this.appPriority = allocation.getPriority(); + this.appPriority = allocation.getPriority() == null ? + null : allocation.getPriority().toString(); this.timestamp = allocation.getTime(); this.dateTime = new Date(allocation.getTime()).toString(); this.allocationState = allocation.getAppState().name(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java index 495c7e248b0..2bf6b23ed70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.SystemClock; @@ -286,18 +288,124 @@ public void testAppActivitiesTTL() throws Exception { ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); } AppActivitiesInfo appActivitiesInfo = newActivitiesManager - .getAppActivitiesInfo(app.getApplicationId(), null, null, null); + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 3); Assert.assertEquals(numActivities, appActivitiesInfo.getAllocations().size()); // sleep until all app activities expired Thread.sleep(cleanupIntervalMs + appActivitiesTTL); // there should be no remaining app activities appActivitiesInfo = newActivitiesManager - .getAppActivitiesInfo(app.getApplicationId(), null, null, null); + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 3); Assert.assertEquals(0, appActivitiesInfo.getAllocations().size()); } + @Test (timeout = 30000) + public void testAppActivitiesPerformance() { + // start recording activities for first app + SchedulerApplicationAttempt app = apps.get(0); + FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0); + activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100); + int numActivities = 100; + int numNodes = 10000; + int testingTimes = 10; + for (int ano = 0; ano < numActivities; ano++) { + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node, + SystemClock.getInstance().getTime(), app); + for (int i = 0; i < numNodes; i++) { + NodeId nodeId = NodeId.newInstance("host" + i, 0); + activitiesManager + .addSchedulingActivityForApp(app.getApplicationId(), null, "0", + ActivityState.SKIPPED, + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, "container", + nodeId, "0"); + } + ActivitiesLogger.APP + .finishAllocatedAppAllocationRecording(activitiesManager, + app.getApplicationId(), null, ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); + } + + // It often take a longer time for the first query, ignore this distraction + activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + true, 100); + + // Test getting normal app activities + Supplier normalSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 100); + Assert.assertEquals(numActivities, + appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + return null; + }; + testManyTimes("Getting normal app activities", normalSupplier, + testingTimes); + + // Test getting aggregated app activities + Supplier aggregatedSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, + RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100); + Assert.assertEquals(numActivities, + appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().get(0).getNodeIds().size()); + return null; + }; + testManyTimes("Getting aggregated app activities", aggregatedSupplier, + testingTimes); + + // Test getting summarized app activities + Supplier summarizedSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, + RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100); + Assert.assertEquals(1, appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().get(0).getNodeIds().size()); + return null; + }; + testManyTimes("Getting summarized app activities", summarizedSupplier, + testingTimes); + } + + private void testManyTimes(String testingName, + Supplier supplier, int testingTimes) { + long totalTime = 0; + for (int i = 0; i < testingTimes; i++) { + long startTime = System.currentTimeMillis(); + supplier.get(); + totalTime += System.currentTimeMillis() - startTime; + } + System.out.println("#" + testingName + ", testing times : " + testingTimes + + ", total cost time : " + totalTime + " ms, average cost time : " + + (float) totalTime / testingTimes + " ms."); + } + /** * Testing activities manager which can record all history information about * node allocations. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index 7650f7acf1f..991690b88ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -1064,4 +1064,222 @@ public void testAppFilterByRequestPrioritiesAndAllocationRequestIds() rm.stop(); } } + + @Test(timeout = 30000) + public void testAppLimit() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024); + try { + RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", + json.getString("diagnostic")); + + // am1 asks for 1 * 5GB container + am1.allocate("*", 5120, 1, new ArrayList<>()); + // trigger scheduling triple, there will be 3 app activities in cache + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // query all app activities without limit + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + + // query all app activities with limit > 3 + params.putSingle(RMWSConsts.LIMIT, "10"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + + // query app activities with limit = 2 + params.putSingle(RMWSConsts.LIMIT, "2"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 2); + + // query app activities with limit = 1 + params.putSingle(RMWSConsts.LIMIT, "1"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // query all app activities with invalid limit + params.putSingle(RMWSConsts.LIMIT, "STRING"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be integer!", json.getString("diagnostic")); + + // query all app activities with limit = 0 + params.putSingle(RMWSConsts.LIMIT, "0"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be greater than 0!", + json.getString("diagnostic")); + + // query all app activities with limit < 0 + params.putSingle(RMWSConsts.LIMIT, "-3"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be greater than 0!", + json.getString("diagnostic")); + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000) + public void testAppActions() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024); + try { + RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + // am1 asks for 10 * 512MB container + am1.allocate("*", 512, 10, new ArrayList<>()); + + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + params.add("maxTime", 1); //only last for 1 second + + // testing invalid action + params.add(RMWSConsts.ACTIONS, "get"); + params.add(RMWSConsts.ACTIONS, "invalid-action"); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); + assertTrue(json.getString("diagnostic").startsWith("Got invalid action")); + + /* + * testing get action + */ + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // app activities won't be recorded + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + /* + * testing update action + */ + params.putSingle(RMWSConsts.ACTIONS, "update"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("Successfully notified actions: update", + json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // app activities should be recorded + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + /* + * testing update and get actions + */ + params.remove(RMWSConsts.ACTIONS); + params.add(RMWSConsts.ACTIONS, "update"); + params.add(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // more app activities should be recorded + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 2); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // more app activities should be recorded + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + } finally { + rm.stop(); + } + } + + @Test(timeout=30000) + public void testAppSummary() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 4 * 1024); + MockNM nm3 = rm.registerNode("127.0.0.3:1234", 4 * 1024); + + try { + RMApp app1 = rm.submitApp(5120, "app1", "user1", null, "b1"); + + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("scheduler/app-activities").queryParams(params) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("waiting for display", + json.getString("diagnostic")); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + // am1 asks for 1 * 5GB container + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * 1024), 1)), null); + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm3.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + params.add(RMWSConsts.SUMMARIZE, "true"); + params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler/app-activities").queryParams(params) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 3); + } finally { + rm.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index 7e6f3062521..bf0dee6c872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -192,7 +192,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { // time and appId are specified inside hsr return RouterWebServiceUtil.genericForward(webAppAddress, hsr, AppActivitiesInfo.class, HTTPMethods.GET, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 1c8b7a85f29..1ed5f5929d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -1146,7 +1146,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { throw new NotImplementedException("Code is not implemented"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 9327c6f688d..93275476555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -95,6 +95,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE; + /** * RouterWebServices is a service that runs on each router that can be used to * intercept and inspect {@link RMWebServiceProtocol} messages from client to @@ -465,11 +467,16 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set requestPriorities, @QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS) Set allocationRequestIds, - @QueryParam(RMWSConsts.GROUP_BY) String groupBy) { + @QueryParam(RMWSConsts.GROUP_BY) String groupBy, + @QueryParam(RMWSConsts.LIMIT) String limit, + @QueryParam(RMWSConsts.ACTIONS) Set actions, + @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE) + boolean summarize) { init(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time, - requestPriorities, allocationRequestIds, groupBy); + requestPriorities, allocationRequestIds, groupBy, limit, actions, + summarize); } @GET diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 535c579a85d..78aab5a961b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -180,8 +180,8 @@ protected ActivitiesInfo getActivities(String user) protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return routerWebService.getAppActivities( - createHttpServletRequest(user), null, null, null, null, null); + return routerWebService.getAppActivities(createHttpServletRequest(user), + null, null, null, null, null, null, null, false); } protected ApplicationStatisticsInfo getAppStatistics(String user) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java index f93b397e386..50200ed2b71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java @@ -141,7 +141,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { return new AppActivitiesInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java index 126610cc475..eb7222f9f3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java @@ -169,9 +169,11 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { return getNextInterceptor().getAppActivities(hsr, appId, time, - requestPriorities, allocationRequestIds, groupBy); + requestPriorities, allocationRequestIds, groupBy, limit, + actions, summarize); } @Override