diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index c039514..25621a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -457,4 +457,18 @@ public abstract void setLogAggregationStatus( @Unstable public abstract void setApplicationTimeouts( List timeouts); + + /** + * Get the {@link ReservationId} that the application is submitted to. + * + * @return Application's ReservationId + */ + @Unstable + public abstract ReservationId getReservationId(); + + /** + * Set the {@link ReservationId} that the application is submitted to. + */ + @Unstable + public abstract void setReservationId(ReservationId reservationId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 43a661f..a99d1bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -215,6 +215,7 @@ message ApplicationReportProto { optional string appNodeLabelExpression = 24; optional string amNodeLabelExpression = 25; repeated ApplicationTimeoutProto application_timeouts = 26; + optional ReservationIdProto reservationId = 27; } message ApplicationTimeoutProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index e218036..83cadb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -1288,6 +1289,33 @@ private ReservationSubmissionRequest submitReservationTestHelper( } @Test + public void testReservationIdInClusterApps() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ApplicationSubmissionContext context = + client.createApplication().getApplicationSubmissionContext(); + ReservationId reservationId = + submitReservationTestHelper(client, arrival, deadline, duration) + .getReservationId(); + context.setReservationID(reservationId); + context.setAMContainerSpec(ContainerLaunchContext.newInstance(new HashMap(), + new HashMap(), new ArrayList(), new HashMap(), null, new HashMap())); + context.setAMContainerResourceRequest(ResourceRequest.newInstance( + Priority.UNDEFINED, "", Resource.newInstance(1024, 1), 1)); + client.submitApplication(context); + ApplicationReport report = + client.getApplicationReport(context.getApplicationId()); + Assert.assertTrue(report.getReservationId().equals(reservationId)); + + client.stop(); + } + + @Test public void testCreateReservation() throws Exception { MiniYARNCluster cluster = setupMiniYARNCluster(); YarnClient client = setupYarnClient(cluster); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index f4987d3..89c4e6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -576,6 +578,15 @@ private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + private ReservationIdPBImpl convertFromProtoFormat( + ReservationIdProto r) { + return new ReservationIdPBImpl(r); + } + + private ReservationIdProto convertToProtoFormat(ReservationId r) { + return ((ReservationIdPBImpl) r).getProto(); + } + @Override public LogAggregationStatus getLogAggregationStatus() { ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; @@ -751,4 +762,22 @@ public void setApplicationTimeouts(List timeouts) { } this.applicationTimeoutList = timeouts; } + + public ReservationId getReservationId() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasReservationId()) { + return null; + } + return convertFromProtoFormat(p.getReservationId()); + } + + @Override + public void setReservationId(ReservationId reservationId) { + maybeInitBuilder(); + if (reservationId == null) { + builder.clearReservationId(); + return; + } + builder.setReservationId(convertToProtoFormat(reservationId)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0bf5f51..69b3ec5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -804,6 +804,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0)); } report.setApplicationTimeouts(Collections.singletonList(timeout)); + report.setReservationId(this.getReservationId()); return report; } finally { this.readLock.unlock(); @@ -2058,4 +2059,4 @@ public Priority getApplicationPriority() { public void setApplicationPriority(Priority applicationPriority) { this.applicationPriority = applicationPriority; } -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 2d364f4..c66a159 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -71,6 +72,7 @@ protected String user; protected String name; protected String queue; + protected String reservationId; protected YarnApplicationState state; protected FinalApplicationStatus finalStatus; protected float progress; @@ -156,6 +158,10 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess, this.priority = app.getApplicationPriority() .getPriority(); } + ReservationId reservationId = app.getReservationId(); + if (reservationId != null) { + this.reservationId = app.getReservationId().toString(); + } this.progress = app.getProgress() * 100; this.diagnostics = app.getDiagnostics().toString(); if (diagnostics == null || diagnostics.isEmpty()) { @@ -442,6 +448,10 @@ public String getAmNodeLabelExpression() { return this.amNodeLabelExpression; } + public String getReservationId() { + return this.reservationId; + } + public ResourcesInfo getResourceInfo() { return resourceInfo; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index ea573e2..7f65a7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -40,8 +40,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; @@ -254,8 +258,9 @@ public void waitForState(ApplicationId appId, RMAppState finalState) break; } - LOG.info("App : " + appId + " State is : " + app.getState() + - " Waiting for state : " + finalState); + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalState + " Because of: " + + app.getDiagnostics()); Thread.sleep(WAIT_MS_PER_LOOP); timeWaiting += WAIT_MS_PER_LOOP; } @@ -490,6 +495,18 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, String name, + ReservationId reservationId, String queue) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp(resource, name, + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), + null, null, true, false, false, null, 0, null, true, null, "", null, + reservationId); + } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() @@ -652,8 +669,23 @@ public RMApp submitApp(Resource capability, String name, String user, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, Priority priority, String amLabel, - Map applicationTimeouts) - throws Exception { + Map applicationTimeouts) throws Exception { + return submitApp(capability, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, amLabel, + applicationTimeouts, null); + } + + public RMApp submitApp(Resource capability, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts, + ReservationId reservationId) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -667,6 +699,7 @@ public RMApp submitApp(Resource capability, String name, String user, .newRecord(ApplicationSubmissionContext.class); sub.setKeepContainersAcrossApplicationAttempts(keepContainers); sub.setApplicationId(appId); + sub.setReservationID(reservationId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); if (applicationTimeouts != null && applicationTimeouts.size() > 0) { @@ -1129,6 +1162,19 @@ public ApplicationReport getApplicationReport(ApplicationId appId) return response.getApplicationReport(); } + public ReservationId getNewReservation() throws YarnException, IOException { + ApplicationClientProtocol client = getClientRMService(); + return client.getNewReservation(GetNewReservationRequest.newInstance()) + .getReservationId(); + } + + public void submitReservation(ReservationDefinition definition, String queue, + ReservationId reservationId) throws YarnException, IOException { + ApplicationClientProtocol client = getClientRMService(); + client.submitReservation(ReservationSubmissionRequest + .newInstance(definition, queue, reservationId)); + } + public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java index 657bec4..44e675d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesReservation.java @@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; @@ -92,7 +94,7 @@ private static MockRM rm; - private static final int MINIMUM_RESOURCE_DURATION = 1000000; + private static final int MINIMUM_RESOURCE_DURATION = 10000; private static final Clock clock = new UTCClock(); private static final String TEST_DIR = new File(System.getProperty( "test.build.data", "/tmp")).getAbsolutePath(); @@ -929,11 +931,7 @@ public void testExcludeResourceAllocations() throws Exception { @Test public void testDeleteReservation() throws JSONException, Exception { rm.start(); - for (int i = 0; i < 100; i++) { - MockNM amNodeManager = - rm.registerNode("127.0.0." + i + ":1234", 100 * 1024); - amNodeManager.nodeHeartbeat(true); - } + setupCluster(100); ReservationId rid = getReservationIdTestHelper(1); @@ -945,6 +943,32 @@ public void testDeleteReservation() throws JSONException, Exception { rm.stop(); } + @Test + public void testReservationIdInClusterApps() throws Exception { + rm.start(); + setupCluster(100); + + ReservationId rid = getReservationIdTestHelper(1); + ClientResponse response = reservationSubmissionTestHelper( + "reservation/submit", MediaType.APPLICATION_JSON, clock.getTime(), + "res1", rid); + + if (this.isAuthenticationEnabled()) { + assertTrue(isHttpSuccessResponse(response)); + rm.getRMContext().getReservationSystem().synchronizePlan(DEFAULT_QUEUE, + false); + RMApp app = rm.submitApp(1024, "new_app", rid, DEFAULT_QUEUE); + ClientResponse response2 = getApplicationReportHelper( + "apps/" + app.getApplicationId().toString(), + MediaType.APPLICATION_JSON); + AppInfo info = response2.getEntity(AppInfo.class); + + assertTrue(info.getReservationId().equals(rid.toString())); + } + + rm.stop(); + } + /** * This method is used when a ReservationId is required. Attempt to use REST * API. If authentication is not enabled, ensure that the response status is @@ -996,9 +1020,10 @@ private ClientResponse reservationSubmissionTestHelper(String path, ReservationId reservationId) throws Exception { String reservationJson = loadJsonFile("submit-reservation.json"); - String reservationJsonRequest = String.format(reservationJson, - reservationId.toString(), arrival, arrival + MINIMUM_RESOURCE_DURATION, - reservationName); + String reservationJsonRequest = + String.format(reservationJson, reservationId.toString(), arrival, + arrival + MINIMUM_RESOURCE_DURATION, reservationName, + (int) Math.floor(0.9 * MINIMUM_RESOURCE_DURATION)); return submitAndVerifyReservation(path, media, reservationJsonRequest); } @@ -1025,6 +1050,18 @@ private ClientResponse submitAndVerifyReservation(String path, String media, return response; } + private ClientResponse getApplicationReportHelper(String path, String media) + throws Exception { + ClientResponse response = + constructWebResource(path).accept(media).get(ClientResponse.class); + + if (!this.isAuthenticationEnabled()) { + assertResponseStatusCode(Status.UNAUTHORIZED, response.getStatusInfo()); + } + + return response; + } + private void updateReservationTestHelper(String path, ReservationId reservationId, String media) throws JSONException, Exception { @@ -1119,8 +1156,8 @@ private void testRDLHelper(JSONObject json) throws JSONException { ("reservation-request-interpreter"); assertEquals("0", type); - assertEquals(60, requests.getJSONArray("reservation-request") - .getJSONObject(0).getInt("duration")); + assertEquals((int) Math.floor(0.9 * MINIMUM_RESOURCE_DURATION), + requests.getJSONObject("reservation-request").getInt("duration")); } private JSONObject testListReservationHelper(WebResource resource) throws diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json index 580c599..b1b4bf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/submit-reservation.json @@ -9,22 +9,13 @@ "reservation-request-interpreter" : 0, "reservation-request" : [ { - "duration" : 60, + "duration" : %s, "num-containers" : 220, "min-concurrency" : 220, "capability" : { "memory" : 1024, "vCores" : 1 } - }, - { - "duration" : 120, - "num-containers" : 110, - "min-concurrency" : 110, - "capability" : { - "memory" : 1024, - "vCores" : 1 - } } ] }