diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index 33116a4..c98a3b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -447,4 +447,18 @@ public abstract void setLogAggregationStatus( @Unstable public abstract void setAmNodeLabelExpression(String amNodeLabelExpression); + + /** + * Get the {@link ReservationId} that the application is submitted to. + * + * @return Application's ReservationId + */ + @Unstable + public abstract ReservationId getReservationId(); + + /** + * Set the {@link ReservationId} that the application is submitted to. + */ + @Unstable + public abstract void setReservationId(ReservationId reservationId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fd..19419d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -212,6 +212,7 @@ message ApplicationReportProto { optional PriorityProto priority = 23; optional string appNodeLabelExpression = 24; optional string amNodeLabelExpression = 25; + optional ReservationIdProto reservationId = 26; } enum LogAggregationStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 19966ad..417b968 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -1267,6 +1268,33 @@ private ReservationSubmissionRequest submitReservationTestHelper( } @Test + public void testReservationIdInClusterApps() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ApplicationSubmissionContext context = + client.createApplication().getApplicationSubmissionContext(); + ReservationId reservationId = + submitReservationTestHelper(client, arrival, deadline, duration) + .getReservationId(); + context.setReservationID(reservationId); + context.setAMContainerSpec(ContainerLaunchContext.newInstance(new HashMap(), + new HashMap(), new ArrayList(), new HashMap(), null, new HashMap())); + context.setAMContainerResourceRequest(ResourceRequest.newInstance( + Priority.UNDEFINED, "", Resource.newInstance(1024, 1), 1)); + client.submitApplication(context); + ApplicationReport report = + client.getApplicationReport(context.getApplicationId()); + Assert.assertTrue(report.getReservationId().equals(reservationId)); + + client.stop(); + } + + @Test public void testCreateReservation() throws Exception { MiniYARNCluster cluster = setupMiniYARNCluster(); YarnClient client = setupYarnClient(cluster); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index 1072815..d4d93ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -567,6 +569,15 @@ private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + private ReservationIdPBImpl convertFromProtoFormat( + ReservationIdProto r) { + return new ReservationIdPBImpl(r); + } + + private ReservationIdProto convertToProtoFormat(ReservationId r) { + return ((ReservationIdPBImpl) r).getProto(); + } + @Override public LogAggregationStatus getLogAggregationStatus() { ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; @@ -668,4 +679,23 @@ public void setAmNodeLabelExpression(String amNodeLabelExpression) { } builder.setAmNodeLabelExpression((amNodeLabelExpression)); } + + @Override + public ReservationId getReservationId() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasReservationId()) { + return null; + } + return convertFromProtoFormat(p.getReservationId()); + } + + @Override + public void setReservationId(ReservationId reservationId) { + maybeInitBuilder(); + if (reservationId == null) { + builder.clearReservationId(); + return; + } + builder.setReservationId(convertToProtoFormat(reservationId)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc311..db49705 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -771,6 +771,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, report.setUnmanagedApp(submissionContext.getUnmanagedAM()); report.setAppNodeLabelExpression(getAppNodeLabelExpression()); report.setAmNodeLabelExpression(getAmNodeLabelExpression()); + report.setReservationId(this.getReservationId()); return report; } finally { this.readLock.unlock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 3bd6cff..0402bf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -69,6 +70,7 @@ protected String user; protected String name; protected String queue; + protected String reservationId; protected YarnApplicationState state; protected FinalApplicationStatus finalStatus; protected float progress; @@ -152,6 +154,10 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess, this.priority = appSubmissionContext.getPriority() .getPriority(); } + ReservationId reservationId = app.getReservationId(); + if (reservationId != null) { + this.reservationId = app.getReservationId().toString(); + } this.progress = app.getProgress() * 100; this.diagnostics = app.getDiagnostics().toString(); if (diagnostics == null || diagnostics.isEmpty()) { @@ -406,6 +412,10 @@ public String getAmNodeLabelExpression() { return this.amNodeLabelExpression; } + public String getReservationId() { + return this.reservationId; + } + public ResourcesInfo getResourceInfo() { return resourceInfo; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 58bb721..efb2c1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -39,8 +39,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -60,6 +62,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; @@ -186,8 +190,9 @@ public void waitForState(ApplicationId appId, RMAppState finalState) break; } - LOG.info("App : " + appId + " State is : " + app.getState() + - " Waiting for state : " + finalState); + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState + " Because of: " + + app.getDiagnostics()); Thread.sleep(WAIT_MS_PER_LOOP); timeWaiting += WAIT_MS_PER_LOOP; } @@ -417,6 +422,18 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, String name, + ReservationId reservationId, String queue) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp(resource, name, + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), + null, null, true, false, false, null, 0, null, true, null, "", null, + reservationId); + } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() @@ -579,8 +596,23 @@ public RMApp submitApp(Resource capability, String name, String user, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, Priority priority, String amLabel, - Map applicationTimeouts) - throws Exception { + Map applicationTimeouts) throws Exception { + return submitApp(capability, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, amLabel, + applicationTimeouts, null); + } + + public RMApp submitApp(Resource capability, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts, + ReservationId reservationId) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -594,6 +626,7 @@ public RMApp submitApp(Resource capability, String name, String user, .newRecord(ApplicationSubmissionContext.class); sub.setKeepContainersAcrossApplicationAttempts(keepContainers); sub.setApplicationId(appId); + sub.setReservationID(reservationId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); if (applicationTimeouts != null && applicationTimeouts.size() > 0) { @@ -1020,6 +1053,19 @@ public ApplicationReport getApplicationReport(ApplicationId appId) return response.getApplicationReport(); } + public ReservationId getNewReservation() throws YarnException, IOException { + ApplicationClientProtocol client = getClientRMService(); + return client.getNewReservation(GetNewReservationRequest.newInstance()) + .getReservationId(); + } + + public void submitReservation(ReservationDefinition definition, String queue, + ReservationId reservationId) throws YarnException, IOException { + ApplicationClientProtocol client = getClientRMService(); + client.submitReservation(ReservationSubmissionRequest + .newInstance(definition, queue, reservationId)); + } + public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java index 657bec4..44e675d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java @@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; @@ -92,7 +94,7 @@ private static MockRM rm; - private static final int MINIMUM_RESOURCE_DURATION = 1000000; + private static final int MINIMUM_RESOURCE_DURATION = 10000; private static final Clock clock = new UTCClock(); private static final String TEST_DIR = new File(System.getProperty( "test.build.data", "/tmp")).getAbsolutePath(); @@ -929,11 +931,7 @@ public void testExcludeResourceAllocations() throws Exception { @Test public void testDeleteReservation() throws JSONException, Exception { rm.start(); - for (int i = 0; i < 100; i++) { - MockNM amNodeManager = - rm.registerNode("127.0.0." + i + ":1234", 100 * 1024); - amNodeManager.nodeHeartbeat(true); - } + setupCluster(100); ReservationId rid = getReservationIdTestHelper(1); @@ -945,6 +943,32 @@ public void testDeleteReservation() throws JSONException, Exception { rm.stop(); } + @Test + public void testReservationIdInClusterApps() throws Exception { + rm.start(); + setupCluster(100); + + ReservationId rid = getReservationIdTestHelper(1); + ClientResponse response = reservationSubmissionTestHelper( + "reservation/submit", MediaType.APPLICATION_JSON, clock.getTime(), + "res1", rid); + + if (this.isAuthenticationEnabled()) { + assertTrue(isHttpSuccessResponse(response)); + rm.getRMContext().getReservationSystem().synchronizePlan(DEFAULT_QUEUE, + false); + RMApp app = rm.submitApp(1024, "new_app", rid, DEFAULT_QUEUE); + ClientResponse response2 = getApplicationReportHelper( + "apps/" + app.getApplicationId().toString(), + MediaType.APPLICATION_JSON); + AppInfo info = response2.getEntity(AppInfo.class); + + assertTrue(info.getReservationId().equals(rid.toString())); + } + + rm.stop(); + } + /** * This method is used when a ReservationId is required. Attempt to use REST * API. If authentication is not enabled, ensure that the response status is @@ -996,9 +1020,10 @@ private ClientResponse reservationSubmissionTestHelper(String path, ReservationId reservationId) throws Exception { String reservationJson = loadJsonFile("submit-reservation.json"); - String reservationJsonRequest = String.format(reservationJson, - reservationId.toString(), arrival, arrival + MINIMUM_RESOURCE_DURATION, - reservationName); + String reservationJsonRequest = + String.format(reservationJson, reservationId.toString(), arrival, + arrival + MINIMUM_RESOURCE_DURATION, reservationName, + (int) Math.floor(0.9 * MINIMUM_RESOURCE_DURATION)); return submitAndVerifyReservation(path, media, reservationJsonRequest); } @@ -1025,6 +1050,18 @@ private ClientResponse submitAndVerifyReservation(String path, String media, return response; } + private ClientResponse getApplicationReportHelper(String path, String media) + throws Exception { + ClientResponse response = + constructWebResource(path).accept(media).get(ClientResponse.class); + + if (!this.isAuthenticationEnabled()) { + assertResponseStatusCode(Status.UNAUTHORIZED, response.getStatusInfo()); + } + + return response; + } + private void updateReservationTestHelper(String path, ReservationId reservationId, String media) throws JSONException, Exception { @@ -1119,8 +1156,8 @@ private void testRDLHelper(JSONObject json) throws JSONException { ("reservation-request-interpreter"); assertEquals("0", type); - assertEquals(60, requests.getJSONArray("reservation-request") - .getJSONObject(0).getInt("duration")); + assertEquals((int) Math.floor(0.9 * MINIMUM_RESOURCE_DURATION), + requests.getJSONObject("reservation-request").getInt("duration")); } private JSONObject testListReservationHelper(WebResource resource) throws diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json index 580c599..b1b4bf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json @@ -9,22 +9,13 @@ "reservation-request-interpreter" : 0, "reservation-request" : [ { - "duration" : 60, + "duration" : %s, "num-containers" : 220, "min-concurrency" : 220, "capability" : { "memory" : 1024, "vCores" : 1 } - }, - { - "duration" : 120, - "num-containers" : 110, - "min-concurrency" : 110, - "capability" : { - "memory" : 1024, - "vCores" : 1 - } } ] }