diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 03552e4..1b26cd3 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -90,7 +90,7 @@ private ApplicationReport getUnknownApplicationReport() { return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + YarnConfiguration.DEFAULT_APPLICATION_TYPE, null, null); } NotRunningJob(ApplicationReport applicationReport, JobState jobState) { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 7b47711..38a1b80 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -470,9 +471,10 @@ private ApplicationReport getFinishedApplicationReport() { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLE, null); } private ApplicationReport getRunningApplicationReport(String host, int port) { @@ -480,9 +482,11 @@ private ApplicationReport getRunningApplicationReport(String host, int port) { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics", - "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", host, port, null, YarnApplicationState.RUNNING, + "diagnostics", + "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, + YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLE, null); } private ResourceMgrDelegate getRMDelegate() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index 0854fdb..d49a3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -40,6 +40,7 @@ *
  • {@link YarnApplicationState} of the application.
  • *
  • Diagnostic information in case of errors.
  • *
  • Start time of the application.
  • + *
  • {@link LogAggregationState} of the application.
  • *
  • Client {@link Token} of the application (if security is enabled).
  • * *

    @@ -58,7 +59,8 @@ public static ApplicationReport newInstance(ApplicationId applicationId, YarnApplicationState state, String diagnostics, String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String applicationType, Token amRmToken) { + float progress, String applicationType, + LogAggregationState logAggregationState, Token amRmToken) { ApplicationReport report = Records.newRecord(ApplicationReport.class); report.setApplicationId(applicationId); report.setCurrentApplicationAttemptId(applicationAttemptId); @@ -78,6 +80,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId, report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(applicationType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } @@ -349,4 +352,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId, @Stable public abstract Token getAMRMToken(); + /** + * Get the application's log aggregation state + * @return application's log aggregation state + */ + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Stable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java new file mode 100644 index 0000000..b94c61e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + * Enumeration of various states of log aggregation. + */ + +@Public +@Stable +public enum LogAggregationState { + /**The log aggregation is disabled. */ + DISABLE, + + /** The log aggregation is not started. */ + NOT_START, + + /** The log aggregation is in progress. */ + IN_PROGRESS, + + /** The log aggregation is failed. */ + FAILED, + + /** The log aggregation is finished. */ + COMPLETED +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3192306..adcde9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -151,7 +151,8 @@ message ApplicationReportProto { optional ApplicationAttemptIdProto currentApplicationAttemptId = 16; optional float progress = 17; optional string applicationType = 18; - optional hadoop.common.TokenProto am_rm_token = 19; + optional LogAggregationStateProto logAggregationState = 19; + optional hadoop.common.TokenProto am_rm_token = 20; } enum NodeStateProto { @@ -180,7 +181,13 @@ message NodeReportProto { optional int64 last_health_report_time = 9; } - +enum LogAggregationStateProto { + LOG_AGGREGATION_DISABLE = 0; + LOG_AGGREGATION_NOT_START = 1; + LOG_AGGREGATION_IN_PROGRESS = 2; + LOG_AGGREGATION_FAILED = 3; + LOG_AGGREGATION_COMPLETED = 4; +} //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 155ba5d..6ec4644 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a7b7d65..37c5f3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -278,6 +278,8 @@ private void printApplicationReport(String applicationId) appReportStr.println(appReport.getYarnApplicationState()); appReportStr.print("\tFinal-State : "); appReportStr.println(appReport.getFinalApplicationStatus()); + appReportStr.print("\tLOG-AGGREGATION-State : "); + appReportStr.println(appReport.getLogAggregationState()); appReportStr.print("\tTracking-URL : "); appReportStr.println(appReport.getOriginalTrackingUrl()); appReportStr.print("\tRPC Port : "); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 826433d..3fd1eba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -271,7 +272,8 @@ public void setYarnApplicationState(YarnApplicationState state) { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_START, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -282,7 +284,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + LogAggregationState.COMPLETED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -291,7 +293,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_START, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -302,7 +304,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, - "NON-MAPREDUCE", null); + "NON-MAPREDUCE", LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); return applicationReports; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 1d08f24..5bdd0f7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -84,7 +85,8 @@ public void testGetApplicationReport() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.DISABLE, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); int result = cli.run(new String[] { "-status", applicationId.toString() }); @@ -103,6 +105,7 @@ public void testGetApplicationReport() throws Exception { pw.println("\tProgress : 53.79%"); pw.println("\tState : FINISHED"); pw.println("\tFinal-State : SUCCEEDED"); + pw.println("\tLOG-AGGREGATION-State : DISABLE"); pw.println("\tTracking-URL : N/A"); pw.println("\tRPC Port : 124"); pw.println("\tAM Host : host"); @@ -138,7 +141,8 @@ public void testGetApplications() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_START, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -147,8 +151,8 @@ public void testGetApplications() throws Exception { applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2), "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", + LogAggregationState.COMPLETED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -157,7 +161,7 @@ public void testGetApplications() throws Exception { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_START, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -166,7 +170,7 @@ public void testGetApplications() throws Exception { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, "NON-MAPREDUCE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9); @@ -175,7 +179,7 @@ public void testGetApplications() throws Exception { "user5", "queue5", "appname5", "host5", 128, null, YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5, FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport5); ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10); @@ -184,7 +188,7 @@ public void testGetApplications() throws Exception { "user6", "queue6", "appname6", "host6", 129, null, YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6, FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG", - null); + LogAggregationState.IN_PROGRESS, null); applicationReports.add(newApplicationReport6); // Test command yarn application -list @@ -546,7 +550,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.COMPLETED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport2); int result = cli.run(new String[] { "-kill", applicationId.toString() }); @@ -559,7 +564,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_START, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); result = cli.run(new String[] { "-kill", applicationId.toString() }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index 9716f74..b65e18f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -516,4 +518,34 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + + @Override + public LogAggregationState getLogAggregationState() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public void setLogAggregationState( + LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index b660f7d..6a9cb34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto; @@ -206,4 +208,15 @@ public static ApplicationAccessType convertFromProtoFormat( return ApplicationAccessType.valueOf(e.name().replace( APP_ACCESS_TYPE_PREFIX, "")); } + + /* + * LogAggregationState + */ + private static String LOG_AGGREGATION_PREFIX = "LOG_AGGREGATION_"; + public static LogAggregationStateProto convertToProtoFormat(LogAggregationState e) { + return LogAggregationStateProto.valueOf(LOG_AGGREGATION_PREFIX + e.name()); + } + public static LogAggregationState convertFromProtoFormat(LogAggregationStateProto e) { + return LogAggregationState.valueOf(e.name().replace(LOG_AGGREGATION_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java index 9302d4b..685bf45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assert; @@ -56,9 +57,10 @@ protected static ApplicationReport createApplicationReport( ApplicationAttemptId.newInstance(appId, appAttemptIdInt); ApplicationReport appReport = ApplicationReport.newInstance(appId, appAttemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLE, null); return appReport; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java new file mode 100644 index 0000000..cd7cd92 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    ContainerLogAggregationStatus is a data structure of the + * log aggregation state for the Container.

    + * + *

    It includes information such as: + *

      + *
    • ContainerId
    • + *
    • log aggregation state for this container
    • + *
    + *

    + * + * @see NodeStatus + */ + +public abstract class ContainerLogAggregationStatus { + + @Private + public static ContainerLogAggregationStatus newInstance( + ContainerId containerId, LogAggregationState logAggregationState) { + ContainerLogAggregationStatus status = + Records.newRecord(ContainerLogAggregationStatus.class); + status.setContainerId(containerId); + status.setLogAggregationState(logAggregationState); + return status; + } + + @Public + @Stable + public abstract ContainerId getContainerId(); + + @Private + @Unstable + public abstract void setContainerId(ContainerId containerId); + + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Unstable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 8e98703..0f9be73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -41,4 +41,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract List + getContainerLogAggregationStatus(); + public abstract void setContainerLogAggregationStatus( + List containerLogAggregationStatus); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java new file mode 100644 index 0000000..a1b0ae8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java @@ -0,0 +1,127 @@ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; +import com.google.protobuf.TextFormat; + + +public class ContainerLogAggregationStatusPBImpl extends ContainerLogAggregationStatus{ + ContainerLogAggregationStatusProto proto = ContainerLogAggregationStatusProto.getDefaultInstance(); + ContainerLogAggregationStatusProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + + public ContainerLogAggregationStatusPBImpl() { + builder = ContainerLogAggregationStatusProto.newBuilder(); + } + + public ContainerLogAggregationStatusPBImpl (ContainerLogAggregationStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerLogAggregationStatusProto getProto() { + mergeLocalToProto(); + this.proto = this.viaProto ? this.proto : this.builder.build(); + this.viaProto = true; + return this.proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (this.viaProto) + maybeInitBuilder(); + this.proto = this.builder.build(); + + this.viaProto = true; + } + + private void maybeInitBuilder() { + if (this.viaProto || this.builder == null) { + this.builder = ContainerLogAggregationStatusProto.newBuilder(this.proto); + } + this.viaProto = false; + } + + @Override + public ContainerId getContainerId() { + ContainerLogAggregationStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + + return this.containerId; + } + + private ContainerId convertFromProtoFormat(ContainerIdProto proto) { + return new ContainerIdPBImpl(proto); + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public LogAggregationState getLogAggregationState() { + ContainerLogAggregationStatusProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public void setLogAggregationState(LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8ed7849..e472379 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -33,9 +33,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -50,6 +52,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List logAggregationStatus = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -310,4 +313,38 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } + + @Override + public List getContainerLogAggregationStatus() { + initContainerLogAggregationStatus(); + return this.logAggregationStatus; + } + + @Override + public void setContainerLogAggregationStatus( + List containerLogAggregationStatus) { + if (containerLogAggregationStatus == null) { + builder.clearContainerLogAggregationStatus(); + } + this.logAggregationStatus = containerLogAggregationStatus; + } + + private synchronized void initContainerLogAggregationStatus() { + if (this.logAggregationStatus != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getContainerLogAggregationStatusList(); + this.logAggregationStatus = new ArrayList(); + + for (ContainerLogAggregationStatusProto c : list) { + this.logAggregationStatus.add(convertFromProtoFormat(c)); + } + } + + private ContainerLogAggregationStatusPBImpl convertFromProtoFormat( + ContainerLogAggregationStatusProto c) { + return new ContainerLogAggregationStatusPBImpl(c); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 1b2a03e..00d9488 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -312,7 +313,8 @@ public static ApplicationReport newApplicationReport( String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String appType, Token amRmToken) { + float progress, String appType, LogAggregationState logAggregationState, + Token amRmToken) { ApplicationReport report = recordFactory .newRecordInstance(ApplicationReport.class); report.setApplicationId(applicationId); @@ -333,6 +335,7 @@ public static ApplicationReport newApplicationReport( report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(appType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..a62b2af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -30,12 +30,18 @@ enum NodeActionProto { SHUTDOWN = 2; } +message ContainerLogAggregationStatusProto { + optional ContainerIdProto container_id = 1; + optional LogAggregationStateProto logAggregationState = 2; +} + message NodeStatusProto { optional NodeIdProto node_id = 1; optional int32 response_id = 2; repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ContainerLogAggregationStatusProto containerLogAggregationStatus = 6; } message MasterKeyProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 7995fb3..42c2f71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -337,7 +337,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse, + remoteNodeStatus.getContainerLogAggregationStatus())); return nodeHeartBeatResponse; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..8a42302 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -211,4 +213,16 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Update the logAggregation state for the container that launched for this RMApp + * @param status the ContainerLogAggregationStatus + */ + void updateLogAggregationStatus(ContainerLogAggregationStatus status); + + /** + * Return the log aggregation State + * @return the log aggregation State of the RMApp + */ + LogAggregationState getLogAggregationState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..f41faaa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -21,10 +21,12 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -40,7 +42,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -50,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -97,7 +102,11 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private final Map containerLogAggregationStatus = + new HashMap(); private final String applicationType; + private final AtomicInteger LogAggregationCompleted = new AtomicInteger(0); + private final AtomicInteger LogAggregationFailed = new AtomicInteger(0); // Mutable fields private long startTime; @@ -511,7 +520,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, this.name, host, rpcPort, clientToAMToken, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, - appUsageReport, origTrackingUrl, progress, this.applicationType, + appUsageReport, origTrackingUrl, progress, this.applicationType, + getLogAggregationState(), amrmToken); } finally { this.readLock.unlock(); @@ -911,6 +921,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { super.transition(app, event); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -932,6 +943,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { @SuppressWarnings("unchecked") public void transition(RMAppImpl app, RMAppEvent event) { + if (app.getCurrentAppAttempt() != null) { + app.initialContainerLogAggregationStatus(app.getCurrentAppAttempt() + .getAllContainers()); + } Set nodes = getNodesOnWhichAttemptRan(app); for (NodeId nodeId : nodes) { app.handler.handle( @@ -1017,4 +1032,63 @@ public YarnApplicationState createApplicationState() { throw new YarnRuntimeException("Unknown state passed!"); } } + + @Override + public void updateLogAggregationStatus(ContainerLogAggregationStatus status) { + this.writeLock.lock(); + try { + if (containerLogAggregationStatus.containsKey(status.getContainerId())) { + LogAggregationState currentState = + containerLogAggregationStatus.get(status.getContainerId()); + if (currentState != LogAggregationState.COMPLETED + && currentState != LogAggregationState.FAILED) { + if (status.getLogAggregationState() == LogAggregationState.COMPLETED) { + LogAggregationCompleted.getAndAdd(1); + } else if (status.getLogAggregationState() == LogAggregationState.FAILED) { + LogAggregationFailed.getAndAdd(1); + } + containerLogAggregationStatus.put(status.getContainerId(), + status.getLogAggregationState()); + } + } + } finally { + this.writeLock.unlock(); + } + } + + private void + initialContainerLogAggregationStatus(Set containers) { + for (ContainerId container : containers) { + containerLogAggregationStatus.put(container, + LogAggregationState.NOT_START); + } + } + + @Override + public LogAggregationState getLogAggregationState() { + this.readLock.lock(); + try { + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + return LogAggregationState.DISABLE; + } + RMAppState currentState = getState(); + if (currentState != RMAppState.FINISHED + && currentState != RMAppState.KILLED + && currentState != RMAppState.FAILED) { + return LogAggregationState.NOT_START; + } + if (LogAggregationFailed.get() > 0 + || containerLogAggregationStatus.size() == 0) { + return LogAggregationState.FAILED; + } else if (LogAggregationCompleted.get() == containerLogAggregationStatus + .size()) { + return LogAggregationState.COMPLETED; + } else { + return LogAggregationState.IN_PROGRESS; + } + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda..64f3b76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -178,4 +179,10 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * get All the containers which are launched for this RMAppAttempt + * @return set of containerIds + */ + Set getAllContainers(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f741a6e..0ecce0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -137,6 +137,7 @@ new HashSet(); private final List justFinishedContainers = new ArrayList(); + private final Set containers = new HashSet(); private Container masterContainer; private float progress = 0; @@ -1411,6 +1412,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.containers.add(containerStatus.getContainerId()); return RMAppAttemptState.RUNNING; } } @@ -1454,6 +1456,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Normal container. appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.containers.add(containerStatus.getContainerId()); return RMAppAttemptState.FINISHING; } } @@ -1587,4 +1590,17 @@ private void removeCredentials(RMAppAttemptImpl appAttempt) { private static String sanitizeTrackingUrl(String url) { return (url == null || url.trim().isEmpty()) ? "N/A" : url; } + + @Override + public Set getAllContainers() { + this.readLock.lock(); + try { + if (this.masterContainer != null) { + this.containers.add(this.masterContainer.getId()); + } + return this.containers; + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7429100..2a062f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -637,6 +639,11 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { statusEvent.getKeepAliveAppIds()); } + if (statusEvent.getContainerLogAggregationStatus() != null + && !statusEvent.getContainerLogAggregationStatus().isEmpty()) { + rmNode.sendContainerLogAggregationStatus(statusEvent + .getContainerLogAggregationStatus()); + } return NodeState.RUNNING; } } @@ -683,6 +690,19 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return latestContainerInfoList; } + private void sendContainerLogAggregationStatus( + List containerLogAggregationStatus) { + for(ContainerLogAggregationStatus status : containerLogAggregationStatus) { + RMApp app = + context.getRMApps().get( + status.getContainerId().getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + app.updateLogAggregationStatus(status); + } + } + } + @VisibleForTesting public void setNextHeartBeat(boolean nextHeartBeat) { this.nextHeartBeat = nextHeartBeat; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index abfacbb..78cc4dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; public class RMNodeStatusEvent extends RMNodeEvent { @@ -32,15 +33,18 @@ private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; + private final List logAggregationStatus; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, - NodeHeartbeatResponse latestResponse) { + NodeHeartbeatResponse latestResponse, + List logAggregationStatus) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; + this.logAggregationStatus = logAggregationStatus; } public NodeHealthStatus getNodeHealthStatus() { @@ -58,4 +62,8 @@ public NodeHeartbeatResponse getLatestResponse() { public List getKeepAliveAppIds() { return this.keepAliveAppIds; } + + public List getContainerLogAggregationStatus() { + return this.logAggregationStatus; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java new file mode 100644 index 0000000..ac46726 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java @@ -0,0 +1,454 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.TestRMAppTransitions; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestLogAggregationStatus { + static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); + + private boolean isSecurityEnabled; + private Configuration conf; + private RMContext rmContext; + private static int maxAppAttempts = + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; + private static int container_id = 1; + private ApplicationId applicationId = MockApps.newAppID(1); + private DrainDispatcher rmDispatcher; + private RMStateStore store; + private YarnScheduler scheduler; + private final int max_wait_time = 20; + private Container amContainer; + + // ignore all the RM application attempt events + private static final class TestApplicationAttemptEventDispatcher implements + EventHandler { + + private final RMContext rmContext; + public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppAttemptEvent event) { + ApplicationId appId = event.getApplicationAttemptId().getApplicationId(); + RMApp rmApp = this.rmContext.getRMApps().get(appId); + if (rmApp != null) { + try { + rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appId, t); + } + } + } + } + + // handle all the RM application events - same as in ResourceManager.java + private static final class TestApplicationEventDispatcher implements + EventHandler { + + private final RMContext rmContext; + public TestApplicationEventDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppEvent event) { + ApplicationId appID = event.getApplicationId(); + RMApp rmApp = this.rmContext.getRMApps().get(appID); + if (rmApp != null) { + try { + rmApp.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appID, t); + } + } + } + } + + // handle all the RM application manager events - same as in + // ResourceManager.java + private static final class TestApplicationManagerEventDispatcher implements + EventHandler { + @Override + public void handle(RMAppManagerEvent event) { + } + } + + // handle all the scheduler events - same as in ResourceManager.java + private static final class TestSchedulerEventDispatcher implements + EventHandler { + @Override + public void handle(SchedulerEvent event) { + } + } + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE; + if (isSecurityEnabled) { + authMethod = AuthenticationMethod.KERBEROS; + } + SecurityUtil.setAuthenticationMethod(authMethod, conf); + UserGroupInformation.setConfiguration(conf); + + rmDispatcher = new DrainDispatcher(); + ContainerAllocationExpirer containerAllocationExpirer = + mock(ContainerAllocationExpirer.class); + AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); + AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); + store = mock(RMStateStore.class); + this.rmContext = + new RMContextImpl(rmDispatcher, store, + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + null, new AMRMTokenSecretManager(conf), + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM()); + + rmDispatcher.register(RMAppAttemptEventType.class, + new TestApplicationAttemptEventDispatcher(this.rmContext)); + + rmDispatcher.register(RMAppEventType.class, + new TestApplicationEventDispatcher(rmContext)); + + rmDispatcher.register(RMAppManagerEventType.class, + new TestApplicationManagerEventDispatcher()); + + rmDispatcher.register(SchedulerEventType.class, + new TestSchedulerEventDispatcher()); + + ApplicationMasterLauncher mockAMLauncher = mock(ApplicationMasterLauncher.class); + doAnswer(new Answer() { + + public Object answer(InvocationOnMock invocation) { + return null; + } + }).when(mockAMLauncher).handle(any(AMLauncherEvent.class)); + rmDispatcher.register(AMLauncherEventType.class, mockAMLauncher); + rmDispatcher.init(conf); + rmDispatcher.start(); + } + + @Test (timeout = 5000) + public void testLogAggregationDisableState() { + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.DISABLE); + } + + @Test (timeout = 5000) + public void testLogAggregationNotStartState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.NOT_START); + } + + @Test (timeout = 5000) + public void testLogAggregationInProgressState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.IN_PROGRESS); + List status = + new ArrayList(); + List ids = + new ArrayList(app.getCurrentAppAttempt() + .getAllContainers()); + status.add(ContainerLogAggregationStatus.newInstance(ids.get(0), + LogAggregationState.IN_PROGRESS)); + for(int i=1; i < ids.size(); i++) { + status.add(ContainerLogAggregationStatus.newInstance(ids.get(i), + LogAggregationState.COMPLETED)); + } + rmNodeKickOffLogAggregationUpdate(status); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.IN_PROGRESS); + } + + @Test (timeout = 5000) + public void testLogAggregationCompletedState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + List status = + new ArrayList(); + for (ContainerId containerId : app.getCurrentAppAttempt() + .getAllContainers()) { + ContainerLogAggregationStatus containerStatus = + ContainerLogAggregationStatus.newInstance(containerId, + LogAggregationState.COMPLETED); + status.add(containerStatus); + } + rmNodeKickOffLogAggregationUpdate(status); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.COMPLETED); + } + + @Test (timeout = 5000) + public void testLogAggregationFailedState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + List status = + new ArrayList(); + List ids = + new ArrayList(app.getCurrentAppAttempt() + .getAllContainers()); + status.add(ContainerLogAggregationStatus.newInstance(ids.get(0), + LogAggregationState.IN_PROGRESS)); + status.add(ContainerLogAggregationStatus.newInstance(ids.get(1), + LogAggregationState.FAILED)); + for(int i=2; i < ids.size(); i++) { + status.add(ContainerLogAggregationStatus.newInstance(ids.get(i), + LogAggregationState.COMPLETED)); + } + rmNodeKickOffLogAggregationUpdate(status); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.FAILED); + } + private void rmNodeKickOffLogAggregationUpdate(List status) { + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + NodeHealthStatus healthStatus = NodeHealthStatus.newInstance(true, null, 1000); + NodeHeartbeatResponse lastResponse = Records.newRecord(NodeHeartbeatResponse.class); + RMNodeStatusEvent event = + new RMNodeStatusEvent(nodeId, healthStatus, new ArrayList(), null, lastResponse, + status); + node.handle(event); + } + + @SuppressWarnings("unchecked") + private RMApp createRunningApp( + ApplicationSubmissionContext submissionContext, Configuration conf) { + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + // ensure max application attempts set to known value + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts); + scheduler = mock(YarnScheduler.class); + + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + + if (submissionContext == null) { + submissionContext = new ApplicationSubmissionContextPBImpl(); + } + // applicationId will not be used because RMStateStore is mocked, + // but applicationId is still set for safety + submissionContext.setApplicationId(applicationId); + + RMApp application = + new RMAppImpl(applicationId, rmContext, conf, name, user, queue, + submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN"); + this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), + application); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.START); + application.handle(event); + assertAppState(RMAppState.NEW_SAVING, application); + event = + new RMAppNewSavedEvent(application.getApplicationId(), null); + application.handle(event); + assertAppState(RMAppState.SUBMITTED, application); + RMAppAttempt currentAttempt = application.getCurrentAppAttempt(); + int times = 0; + while (currentAttempt.getAppAttemptState() + != RMAppAttemptState.SUBMITTED && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + Assert.assertTrue(currentAttempt. + getAppAttemptState() == RMAppAttemptState.SUBMITTED); + currentAttempt.handle( + new RMAppAttemptEvent(currentAttempt.getAppAttemptId(), + RMAppAttemptEventType.APP_ACCEPTED)); + assertAppState(RMAppState.ACCEPTED, application); + amContainer = createAMContainer(currentAttempt); + Allocation allocation = mock(Allocation.class); + when(allocation.getContainers()).thenReturn( + Collections.singletonList(amContainer)); + when(allocation.getContainers()). + thenReturn(Collections.singletonList(amContainer)); + when( + scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), + any(List.class), any(List.class), any(List.class))).thenReturn( + allocation); + currentAttempt.handle( + new RMAppAttemptContainerAllocatedEvent(currentAttempt + .getAppAttemptId(), amContainer)); + currentAttempt + .handle( + new RMAppAttemptNewSavedEvent(currentAttempt + .getAppAttemptId(), null)); + currentAttempt.handle( + new RMAppAttemptEvent(currentAttempt + .getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); + currentAttempt.handle( + new RMAppAttemptRegistrationEvent(currentAttempt + .getAppAttemptId(), null, 0, null)); + + times = 0; + while (application.getState() != RMAppState.RUNNING + && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + assertAppState(RMAppState.RUNNING, application); + Assert.assertTrue(currentAttempt.getAppAttemptState() == + RMAppAttemptState.RUNNING); + // Kick off the RMAppAttemptContainerFinishedEvent to add finished containers + processRMAppAttemptContainerFinishedEvent(currentAttempt, container_id++); + processRMAppAttemptContainerFinishedEvent(currentAttempt, container_id++); + processRMAppAttemptContainerFinishedEvent(currentAttempt, container_id++); + processRMAppAttemptContainerFinishedEvent(currentAttempt, container_id++); + return application; + } + + private static void assertAppState(RMAppState state, RMApp application) { + Assert.assertEquals("application state should have been " + state, + state, application.getState()); + } + + private RMApp moveToFinishedState(RMApp application) { + RMAppAttempt current = application.getCurrentAppAttempt(); + current.handle(new RMAppAttemptUnregistrationEvent(current + .getAppAttemptId(), null, FinalApplicationStatus.SUCCEEDED, "")); + ContainerStatus status = + ContainerStatus.newInstance(amContainer.getId(), + ContainerState.COMPLETE, "", 0); + current.handle(new RMAppAttemptContainerFinishedEvent(current + .getAppAttemptId(), status)); + current.handle(new RMAppAttemptUpdateSavedEvent( + current.getAppAttemptId(), null)); + application.handle(new RMAppUpdateSavedEvent( + application.getApplicationId(), null)); + int times = 0; + while (application.getState() != RMAppState.FINISHED + && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + assertAppState(RMAppState.FINISHED, application); + return application; + } + + private Container createAMContainer(RMAppAttempt currentAttempt) { + Container amContainer = mock(Container.class); + Resource resource = BuilderUtils.newResource(2048, 1); + when(amContainer.getId()).thenReturn( + BuilderUtils.newContainerId(currentAttempt + .getAppAttemptId(), container_id ++)); + when(amContainer.getResource()).thenReturn(resource); + return amContainer; + } + + private void processRMAppAttemptContainerFinishedEvent(RMAppAttempt attempt, + int container_id) { + ApplicationAttemptId attemptId = attempt.getAppAttemptId(); + ContainerId containerId = + ContainerId.newInstance(attemptId, container_id); + + ContainerStatus status = + ContainerStatus.newInstance(containerId, ContainerState.COMPLETE, + "", 0); + attempt.handle(new RMAppAttemptContainerFinishedEvent( + attemptId, status)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 479128a..7f9dc50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -462,7 +462,7 @@ private RMNodeImpl getUnhealthyNode() { NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(node.getNodeID(), status, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 73b9cf7..49923c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,8 +30,10 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -153,6 +155,17 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public void + updateLogAggregationStatus(ContainerLogAggregationStatus status) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public LogAggregationState getLogAggregationState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcb2f6f..95938c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -28,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -226,4 +228,15 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { return null; } + + @Override + public void updateLogAggregationStatus(ContainerLogAggregationStatus status) { + // TODO Auto-generated method stub + } + + @Override + public LogAggregationState getLogAggregationState() { + // TODO Auto-generated method stub + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index b26c37f..9f806bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -149,7 +149,7 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false, "test health report", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY); ClientResponse response =