diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 03552e4..fc825cf 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -90,7 +91,7 @@ private ApplicationReport getUnknownApplicationReport() { return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + YarnConfiguration.DEFAULT_APPLICATION_TYPE, LogAggregationState.DISABLED, null); } NotRunningJob(ApplicationReport applicationReport, JobState jobState) { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 7b47711..abbb92e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -470,9 +471,10 @@ private ApplicationReport getFinishedApplicationReport() { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); } private ApplicationReport getRunningApplicationReport(String host, int port) { @@ -480,9 +482,11 @@ private ApplicationReport getRunningApplicationReport(String host, int port) { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics", - "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", host, port, null, YarnApplicationState.RUNNING, + "diagnostics", + "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, + YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); } private ResourceMgrDelegate getRMDelegate() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index 0854fdb..d49a3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -40,6 +40,7 @@ *
  • {@link YarnApplicationState} of the application.
  • *
  • Diagnostic information in case of errors.
  • *
  • Start time of the application.
  • + *
  • {@link LogAggregationState} of the application.
  • *
  • Client {@link Token} of the application (if security is enabled).
  • * *

    @@ -58,7 +59,8 @@ public static ApplicationReport newInstance(ApplicationId applicationId, YarnApplicationState state, String diagnostics, String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String applicationType, Token amRmToken) { + float progress, String applicationType, + LogAggregationState logAggregationState, Token amRmToken) { ApplicationReport report = Records.newRecord(ApplicationReport.class); report.setApplicationId(applicationId); report.setCurrentApplicationAttemptId(applicationAttemptId); @@ -78,6 +80,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId, report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(applicationType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } @@ -349,4 +352,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId, @Stable public abstract Token getAMRMToken(); + /** + * Get the application's log aggregation state + * @return application's log aggregation state + */ + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Stable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java new file mode 100644 index 0000000..5a01938 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + * Enumeration of various states of log aggregation. + */ + +@Public +@Stable +public enum LogAggregationState { + /**The log aggregation is disabled. */ + DISABLED, + + /** The log aggregation is not started. */ + NOT_STARTED, + + /** The log aggregation is in progress. */ + IN_PROGRESS, + + /** The log aggregation is failed. */ + FAILED, + + /** The log aggregation is finished. */ + COMPLETED, + + /** The time which waits for log aggregation to finish + * exceeds the maximum wait time. + */ + TIME_OUT +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..fa9fde5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -495,13 +495,20 @@ + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - /** + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX + "log-aggregation.retain-seconds"; public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; + + /** + * How long to wait for log aggregation to finish, -1 disables. + */ + public static final String LOG_AGGREGATION_WATTING_MS = YARN_PREFIX + + "log-aggregation.waitting-ms"; + public static final long DEFAULT_LOG_AGGREGATION_WATTING_MS = 10 * 60 * 1000; /** * How long to wait between aggregated log retention checks. If set to diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9d4d59e..689ac59 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -156,7 +156,8 @@ message ApplicationReportProto { optional ApplicationAttemptIdProto currentApplicationAttemptId = 16; optional float progress = 17; optional string applicationType = 18; - optional hadoop.common.TokenProto am_rm_token = 19; + optional LogAggregationStateProto logAggregationState = 19; + optional hadoop.common.TokenProto am_rm_token = 20; } enum NodeStateProto { @@ -185,7 +186,14 @@ message NodeReportProto { optional int64 last_health_report_time = 9; } - +enum LogAggregationStateProto { + LOG_AGGREGATION_DISABLED = 0; + LOG_AGGREGATION_NOT_STARTED = 1; + LOG_AGGREGATION_IN_PROGRESS = 2; + LOG_AGGREGATION_FAILED = 3; + LOG_AGGREGATION_COMPLETED = 4; + LOG_AGGREGATION_TIME_OUT = 5; +} //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a7b7d65..37c5f3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -278,6 +278,8 @@ private void printApplicationReport(String applicationId) appReportStr.println(appReport.getYarnApplicationState()); appReportStr.print("\tFinal-State : "); appReportStr.println(appReport.getFinalApplicationStatus()); + appReportStr.print("\tLOG-AGGREGATION-State : "); + appReportStr.println(appReport.getLogAggregationState()); appReportStr.print("\tTracking-URL : "); appReportStr.println(appReport.getOriginalTrackingUrl()); appReportStr.print("\tRPC Port : "); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 826433d..fdfd3fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -271,7 +272,8 @@ public void setYarnApplicationState(YarnApplicationState state) { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -282,7 +284,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + LogAggregationState.COMPLETED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -291,7 +293,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_STARTED, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -302,7 +304,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, - "NON-MAPREDUCE", null); + "NON-MAPREDUCE", LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); return applicationReports; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 1d08f24..1cf857a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -84,7 +85,8 @@ public void testGetApplicationReport() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.DISABLED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); int result = cli.run(new String[] { "-status", applicationId.toString() }); @@ -103,6 +105,7 @@ public void testGetApplicationReport() throws Exception { pw.println("\tProgress : 53.79%"); pw.println("\tState : FINISHED"); pw.println("\tFinal-State : SUCCEEDED"); + pw.println("\tLOG-AGGREGATION-State : DISABLED"); pw.println("\tTracking-URL : N/A"); pw.println("\tRPC Port : 124"); pw.println("\tAM Host : host"); @@ -138,7 +141,8 @@ public void testGetApplications() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -147,8 +151,8 @@ public void testGetApplications() throws Exception { applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2), "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", + LogAggregationState.COMPLETED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -157,7 +161,7 @@ public void testGetApplications() throws Exception { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_STARTED, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -166,7 +170,7 @@ public void testGetApplications() throws Exception { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, "NON-MAPREDUCE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9); @@ -175,7 +179,7 @@ public void testGetApplications() throws Exception { "user5", "queue5", "appname5", "host5", 128, null, YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5, FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport5); ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10); @@ -184,7 +188,7 @@ public void testGetApplications() throws Exception { "user6", "queue6", "appname6", "host6", 129, null, YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6, FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG", - null); + LogAggregationState.IN_PROGRESS, null); applicationReports.add(newApplicationReport6); // Test command yarn application -list @@ -546,7 +550,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.COMPLETED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport2); int result = cli.run(new String[] { "-kill", applicationId.toString() }); @@ -559,7 +564,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); result = cli.run(new String[] { "-kill", applicationId.toString() }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index 9716f74..b65e18f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -516,4 +518,34 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + + @Override + public LogAggregationState getLogAggregationState() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public void setLogAggregationState( + LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index b660f7d..6a9cb34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto; @@ -206,4 +208,15 @@ public static ApplicationAccessType convertFromProtoFormat( return ApplicationAccessType.valueOf(e.name().replace( APP_ACCESS_TYPE_PREFIX, "")); } + + /* + * LogAggregationState + */ + private static String LOG_AGGREGATION_PREFIX = "LOG_AGGREGATION_"; + public static LogAggregationStateProto convertToProtoFormat(LogAggregationState e) { + return LogAggregationStateProto.valueOf(LOG_AGGREGATION_PREFIX + e.name()); + } + public static LogAggregationState convertFromProtoFormat(LogAggregationStateProto e) { + return LogAggregationState.valueOf(e.name().replace(LOG_AGGREGATION_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java index 9302d4b..b0e7aa4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assert; @@ -56,9 +57,10 @@ protected static ApplicationReport createApplicationReport( ApplicationAttemptId.newInstance(appId, appAttemptIdInt); ApplicationReport appReport = ApplicationReport.newInstance(appId, appAttemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); return appReport; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java new file mode 100644 index 0000000..782696c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    ApplicationLogStatus is a data structure of the + * log aggregation state for the Application.

    + * + *

    It includes information such as: + *

      + *
    • ApplicationId
    • + *
    • LogAggregation State of this application
    • + *
    • Set of LogAggregationState for all containers
    • + *
    • Diagnostic information
    • + *
    + *

    + * + * @see NodeStatus + */ + +public abstract class ApplicationLogAggregationStatus { + @Private + public static ApplicationLogAggregationStatus newInstance(ApplicationId appId, + LogAggregationState appLogState, + Map containersLogStatus, String diagnostic) { + ApplicationLogAggregationStatus appLogStatus = + Records.newRecord(ApplicationLogAggregationStatus.class); + appLogStatus.setApplicationId(appId); + appLogStatus.setLogAggregationState(appLogState); + appLogStatus.setContainersLogStatus(containersLogStatus); + appLogStatus.setDiagnostic(diagnostic); + return appLogStatus; + } + + @Public + @Stable + public abstract ApplicationId getApplicationId(); + + @Private + @Unstable + public abstract void setApplicationId(ApplicationId applicationId); + + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Unstable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); + + @Public + @Stable + public abstract Map getContainersLogStatus(); + + @Private + @Unstable + public abstract void setContainersLogStatus( + Map containersLogStatus); + + @Public + @Stable + public abstract String getDiagnostic(); + + @Private + @Unstable + public abstract void setDiagnostic(String diagnostic); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 8e98703..09e1059 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -41,4 +41,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract List + getApplicationLogAggregationStatus(); + public abstract void setApplicationLogAggregationStatus( + List applicationLogAggregationStatus); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java new file mode 100644 index 0000000..7800851 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerIdLogAggregateStateMapProto; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; + +import com.google.protobuf.TextFormat; + +public class ApplicationLogAggregationStatusPBImpl extends ApplicationLogAggregationStatus{ + + ApplicationLogAggregationStatusProto proto = + ApplicationLogAggregationStatusProto.getDefaultInstance(); + ApplicationLogAggregationStatusProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map containersLogStatus = null; + + public ApplicationLogAggregationStatusPBImpl() { + builder = ApplicationLogAggregationStatusProto.newBuilder(); + } + + public ApplicationLogAggregationStatusPBImpl(ApplicationLogAggregationStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized ApplicationLogAggregationStatusProto getProto() { + mergeLocalToProto(); + this.proto = this.viaProto ? this.proto : this.builder.build(); + this.viaProto = true; + return this.proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private synchronized void mergeLocalToProto() { + if (this.viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + this.proto = this.builder.build(); + this.viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (this.viaProto || this.builder == null) { + this.builder = ApplicationLogAggregationStatusProto.newBuilder(this.proto); + } + this.viaProto = false; + } + + private synchronized void mergeLocalToBuilder() { + if (this.applicationId != null + && !((ApplicationIdPBImpl) this.applicationId).getProto().equals( + builder.getApplicationId())) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.containersLogStatus != null) { + addContainersLogStatusToProto(); + } + } + + private synchronized void addContainersLogStatusToProto() { + maybeInitBuilder(); + builder.clearContainersLogStatus(); + if (this.containersLogStatus == null) { + return; + } + Iterable values = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator contaierIdIterator = containersLogStatus + .keySet().iterator(); + + @Override + public boolean hasNext() { + return contaierIdIterator.hasNext(); + } + + @Override + public ContainerIdLogAggregateStateMapProto next() { + ContainerId key = contaierIdIterator.next(); + return ContainerIdLogAggregateStateMapProto.newBuilder() + .setLogAggregationState( + convertToProtoFormat(containersLogStatus.get(key))) + .setContainerId( + convertToProtoFormat(key)).build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllContainersLogStatus(values); + } + + private ContainerIdProto convertToProtoFormat(ContainerId containerId) { + return ((ContainerIdPBImpl) containerId).getProto(); + } + + private ContainerId convertFromProtoFormat(ContainerIdProto proto) { + return new ContainerIdPBImpl(proto); + } + + @Override + public synchronized ApplicationId getApplicationId() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) { + return new ApplicationIdPBImpl(proto); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId appId) { + return ((ApplicationIdPBImpl)appId).getProto(); + } + + @Override + public synchronized void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + @Override + public synchronized LogAggregationState getLogAggregationState() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public synchronized void setLogAggregationState( + LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } + + @Override + public synchronized Map + getContainersLogStatus() { + initContainersLogStatus(); + return this.containersLogStatus; + } + + private synchronized void initContainersLogStatus() { + if (this.containersLogStatus != null) { + return; + } + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + List list = + p.getContainersLogStatusList(); + this.containersLogStatus = + new HashMap(list.size()); + for (ContainerIdLogAggregateStateMapProto pro : list) { + containersLogStatus.put(convertFromProtoFormat(pro.getContainerId()), + convertFromProtoFormat(pro.getLogAggregationState())); + } + } + + @Override + public synchronized void setContainersLogStatus( + Map containersLogStatus) { + maybeInitBuilder(); + if (this.containersLogStatus == null) { + builder.clearContainersLogStatus(); + } + this.containersLogStatus = containersLogStatus; + } + + @Override + public synchronized String getDiagnostic() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (! p.hasDiagnostic()) { + return null; + } + return p.getDiagnostic(); + } + + @Override + public synchronized void setDiagnostic(String diagnostic) { + maybeInitBuilder(); + if (diagnostic == null) { + builder.clearDiagnostic(); + return; + } + this.builder.setDiagnostic(diagnostic); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8ed7849..eb40989 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -33,9 +33,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -50,6 +52,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List applicationLogStatus = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -80,6 +83,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.applicationLogStatus != null) { + addApplicationLogStatusToProto(); + } } private synchronized void mergeLocalToProto() { @@ -166,6 +172,42 @@ public void remove() { builder.addAllKeepAliveApplications(iterable); } + private synchronized void addApplicationLogStatusToProto() { + maybeInitBuilder(); + builder.clearApplicationLogStatus(); + if (applicationLogStatus == null) + { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = + applicationLogStatus.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationLogAggregationStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllApplicationLogStatus(iterable); + } + @Override public synchronized int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -255,7 +297,20 @@ private synchronized void initKeepAliveApplications() { } } - + + private synchronized void initApplicationLogStatus() { + if (this.applicationLogStatus != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getApplicationLogStatusList(); + this.applicationLogStatus = new ArrayList(); + + for (ApplicationLogAggregationStatusProto c : list) { + this.applicationLogStatus.add(convertFromProtoFormat(c)); + } + } + @Override public synchronized NodeHealthStatus getNodeHealthStatus() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -310,4 +365,31 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } + + private ApplicationLogAggregationStatusProto convertToProtoFormat( + ApplicationLogAggregationStatus c) { + return ((ApplicationLogAggregationStatusPBImpl)c).getProto(); + } + + private ApplicationLogAggregationStatus convertFromProtoFormat( + ApplicationLogAggregationStatusProto c) { + return new ApplicationLogAggregationStatusPBImpl(c); + } + + @Override + public synchronized List + getApplicationLogAggregationStatus() { + initApplicationLogStatus(); + return this.applicationLogStatus; + } + + @Override + public synchronized void setApplicationLogAggregationStatus( + List applicationLogStatus) { + maybeInitBuilder(); + if (applicationLogStatus == null) { + builder.clearApplicationLogStatus(); + } + this.applicationLogStatus = applicationLogStatus; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 1b2a03e..00d9488 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -312,7 +313,8 @@ public static ApplicationReport newApplicationReport( String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String appType, Token amRmToken) { + float progress, String appType, LogAggregationState logAggregationState, + Token amRmToken) { ApplicationReport report = recordFactory .newRecordInstance(ApplicationReport.class); report.setApplicationId(applicationId); @@ -333,6 +335,7 @@ public static ApplicationReport newApplicationReport( report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(appType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..7d1dc0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -30,12 +30,25 @@ enum NodeActionProto { SHUTDOWN = 2; } +message ContainerIdLogAggregateStateMapProto { + optional ContainerIdProto container_id = 1; + optional LogAggregationStateProto logAggregationState = 2; +} + +message ApplicationLogAggregationStatusProto { + optional ApplicationIdProto application_id = 1; + optional LogAggregationStateProto logAggregationState = 2; + repeated ContainerIdLogAggregateStateMapProto containersLogStatus = 3; + optional string diagnostic = 4; +} + message NodeStatusProto { optional NodeIdProto node_id = 1; optional int32 response_id = 2; repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ApplicationLogAggregationStatusProto applicationLogStatus = 6; } message MasterKeyProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 23f8754..e9016f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -339,7 +339,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse, + remoteNodeStatus.getApplicationLogAggregationStatus())); return nodeHeartBeatResponse; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..2efdb2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -211,4 +212,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Return the log aggregation State + * @return the log aggregation State of the RMApp + */ + LogAggregationState getLogAggregationState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index a2fa0e2..fba5e49 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -37,5 +37,8 @@ // Source: RMStateStore APP_NEW_SAVED, APP_UPDATE_SAVED, - APP_REMOVED + APP_REMOVED, + + // Source: RMNode + APP_LOG_AGGREGATION_STATUS_UPDATE } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..a05aaeb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -97,12 +100,19 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private final Map appLogAggregationCompleted = + new HashMap(); + private final Map appLogAggregationFailed = + new HashMap(); private final String applicationType; + private final long logAggregationTimeOut; + private final boolean logAggregationTimeOutDisabled; // Mutable fields private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private int expectedNumOfAppLogStatus = 0; private RMAppAttempt currentAttempt; private String queue; @SuppressWarnings("rawtypes") @@ -234,6 +244,9 @@ RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.KILL)) + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE, + new AppLogAggregationStatusUpdateTransition()) // Transitions from FAILED state // ignorable transitions @@ -249,7 +262,9 @@ RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) - + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE, + new AppLogAggregationStatusUpdateTransition()) .installTopology(); private final StateMachine @@ -302,6 +317,16 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); + this.logAggregationTimeOut = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_WATTING_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_WATTING_MS); + if (logAggregationTimeOut <= 0) { + LOG.info("Log Aggregation time out is disabled because it is" + + " too small (" + logAggregationTimeOut + ")"); + this.logAggregationTimeOutDisabled = true; + } else { + this.logAggregationTimeOutDisabled = false; + } } @Override @@ -511,7 +536,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, this.name, host, rpcPort, clientToAMToken, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, - appUsageReport, origTrackingUrl, progress, this.applicationType, + appUsageReport, origTrackingUrl, progress, this.applicationType, + getLogAggregationState(), amrmToken); } finally { this.readLock.unlock(); @@ -889,6 +915,15 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + private static class AppLogAggregationStatusUpdateTransition extends RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppLogAggregationStatusUpdateEvent updateEvent = + (RMAppLogAggregationStatusUpdateEvent) event; + app.updateLogAggregationStatus(updateEvent.getApplicationLogStatus(), + updateEvent.getNodeId()); + } + } private static class AppKilledTransition extends FinalTransition { @Override @@ -911,6 +946,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { super.transition(app, event); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -933,6 +969,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { @SuppressWarnings("unchecked") public void transition(RMAppImpl app, RMAppEvent event) { Set nodes = getNodesOnWhichAttemptRan(app); + app.expectedNumOfAppLogStatus = nodes.size(); for (NodeId nodeId : nodes) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); @@ -1017,4 +1054,54 @@ public YarnApplicationState createApplicationState() { throw new YarnRuntimeException("Unknown state passed!"); } } + + private void updateLogAggregationStatus(ApplicationLogAggregationStatus status, NodeId nodeId) { + this.writeLock.lock(); + try { + if (status.getLogAggregationState() == LogAggregationState.COMPLETED) { + appLogAggregationCompleted.put(nodeId, status); + } else if (status.getLogAggregationState() == LogAggregationState.FAILED) { + appLogAggregationFailed.put(nodeId, status); + } + if (status.getDiagnostic() != null && !status.getDiagnostic().isEmpty()) { + this.diagnostics.append(status.getDiagnostic() + "\n"); + } + } finally { + this.writeLock.unlock(); + } + } + + @Override + public LogAggregationState getLogAggregationState() { + this.readLock.lock(); + try { + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + return LogAggregationState.DISABLED; + } + RMAppState currentState = getState(); + if (currentState != RMAppState.FINISHED + && currentState != RMAppState.KILLED + && currentState != RMAppState.FAILED) { + return LogAggregationState.NOT_STARTED; + } + if (appLogAggregationFailed.size() > 0 || expectedNumOfAppLogStatus == 0) { + return LogAggregationState.FAILED; + } else if (appLogAggregationCompleted.size() == expectedNumOfAppLogStatus) { + return LogAggregationState.COMPLETED; + } else { + if (this.logAggregationTimeOutDisabled) { + return LogAggregationState.IN_PROGRESS; + } else { + if (System.currentTimeMillis() - this.finishTime <= + this.logAggregationTimeOut) { + return LogAggregationState.IN_PROGRESS; + } + return LogAggregationState.TIME_OUT; + } + } + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java new file mode 100644 index 0000000..135332b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; + +public class RMAppLogAggregationStatusUpdateEvent extends RMAppEvent { + + private final NodeId nodeId; + private final ApplicationLogAggregationStatus applicationLogStatus; + + public RMAppLogAggregationStatusUpdateEvent(ApplicationId appId, + NodeId nodeId, ApplicationLogAggregationStatus applicationLogStatus) { + super(appId, RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE); + this.nodeId = nodeId; + this.applicationLogStatus = applicationLogStatus; + } + + public NodeId getNodeId() { + return this.nodeId; + }; + + public ApplicationLogAggregationStatus getApplicationLogStatus() { + return this.applicationLogStatus; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 52bc285..d37e000 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -48,11 +48,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLogAggregationStatusUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -648,6 +651,11 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { statusEvent.getKeepAliveAppIds()); } + if (statusEvent.getApplicationLogAggregationStatus() != null + && !statusEvent.getApplicationLogAggregationStatus().isEmpty()) { + rmNode.sendApplicationLogStatus(statusEvent + .getApplicationLogAggregationStatus()); + } return NodeState.RUNNING; } } @@ -694,6 +702,20 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return latestContainerInfoList; } + private void sendApplicationLogStatus( + List applicationLogStatus) { + for(ApplicationLogAggregationStatus status : applicationLogStatus) { + RMApp app = + context.getRMApps().get( + status.getApplicationId()); + if (app != null) { + this.context.getDispatcher().getEventHandler().handle( + new RMAppLogAggregationStatusUpdateEvent( + status.getApplicationId(), this.nodeId, status)); + } + } + } + @VisibleForTesting public void setNextHeartBeat(boolean nextHeartBeat) { this.nextHeartBeat = nextHeartBeat; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index abfacbb..c8234de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; public class RMNodeStatusEvent extends RMNodeEvent { @@ -32,15 +33,18 @@ private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; + private final List applicationLogStatus; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, - NodeHeartbeatResponse latestResponse) { + NodeHeartbeatResponse latestResponse, + List applicationLogStatus) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; + this.applicationLogStatus = applicationLogStatus; } public NodeHealthStatus getNodeHealthStatus() { @@ -58,4 +62,9 @@ public NodeHeartbeatResponse getLatestResponse() { public List getKeepAliveAppIds() { return this.keepAliveAppIds; } + + public List + getApplicationLogAggregationStatus() { + return this.applicationLogStatus; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java new file mode 100644 index 0000000..9909620 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java @@ -0,0 +1,514 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.TestRMAppTransitions; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestLogAggregationStatus { + static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); + + private boolean isSecurityEnabled; + private Configuration conf; + private RMContext rmContext; + private static int maxAppAttempts = + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; + private static int container_id = 1; + private static int host_id = 1; + private List nodes = new ArrayList(); + private ApplicationId applicationId = MockApps.newAppID(1); + private DrainDispatcher rmDispatcher; + private RMStateStore store; + private YarnScheduler scheduler; + private final int max_wait_time = 20; + private Container amContainer; + + // handle all the RM application attempt events + private static final class TestApplicationAttemptEventDispatcher implements + EventHandler { + + private final RMContext rmContext; + public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppAttemptEvent event) { + ApplicationId appId = event.getApplicationAttemptId().getApplicationId(); + RMApp rmApp = this.rmContext.getRMApps().get(appId); + if (rmApp != null) { + try { + rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appId, t); + } + } + } + } + + // handle all the RM application events - same as in ResourceManager.java + private static final class TestApplicationEventDispatcher implements + EventHandler { + + private final RMContext rmContext; + public TestApplicationEventDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppEvent event) { + ApplicationId appID = event.getApplicationId(); + RMApp rmApp = this.rmContext.getRMApps().get(appID); + if (rmApp != null) { + try { + rmApp.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appID, t); + } + } + } + } + + // handle all the RM application manager events - same as in + // ResourceManager.java + private static final class TestApplicationManagerEventDispatcher implements + EventHandler { + @Override + public void handle(RMAppManagerEvent event) { + } + } + + // handle all the scheduler events - same as in ResourceManager.java + private static final class TestSchedulerEventDispatcher implements + EventHandler { + @Override + public void handle(SchedulerEvent event) { + } + } + + // handle all the RM Node events + private static final class TestRMNodeEventDispatcher implements + EventHandler { + @Override + public void handle(RMNodeEvent event) { + } + } + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE; + if (isSecurityEnabled) { + authMethod = AuthenticationMethod.KERBEROS; + } + SecurityUtil.setAuthenticationMethod(authMethod, conf); + UserGroupInformation.setConfiguration(conf); + + rmDispatcher = new DrainDispatcher(); + ContainerAllocationExpirer containerAllocationExpirer = + mock(ContainerAllocationExpirer.class); + AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); + AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); + store = mock(RMStateStore.class); + this.rmContext = + new RMContextImpl(rmDispatcher, store, + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + null, new AMRMTokenSecretManager(conf), + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM()); + + rmDispatcher.register(RMAppAttemptEventType.class, + new TestApplicationAttemptEventDispatcher(this.rmContext)); + + rmDispatcher.register(RMAppEventType.class, + new TestApplicationEventDispatcher(rmContext)); + + rmDispatcher.register(RMAppManagerEventType.class, + new TestApplicationManagerEventDispatcher()); + + rmDispatcher.register(SchedulerEventType.class, + new TestSchedulerEventDispatcher()); + + NodesListManager mockNodeListManager = mock(NodesListManager.class); + doAnswer(new Answer() { + + public Object answer(InvocationOnMock invocation) { + return null; + } + }).when(mockNodeListManager).handle(any(NodesListManagerEvent.class)); + rmDispatcher.register(NodesListManagerEventType.class, mockNodeListManager); + + rmDispatcher.register(RMNodeEventType.class, new TestRMNodeEventDispatcher()); + + ApplicationMasterLauncher mockAMLauncher = mock(ApplicationMasterLauncher.class); + doAnswer(new Answer() { + + public Object answer(InvocationOnMock invocation) { + return null; + } + }).when(mockAMLauncher).handle(any(AMLauncherEvent.class)); + rmDispatcher.register(AMLauncherEventType.class, mockAMLauncher); + rmDispatcher.init(conf); + rmDispatcher.start(); + } + + @Test(timeout = 5000) + public void testLogAggregationDisableState() { + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.DISABLED); + } + + @Test (timeout = 5000) + public void testLogAggregationNotStartState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.NOT_STARTED); + } + + @Test (timeout = 5000) + public void testLogAggregationInProgressState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.IN_PROGRESS); + Map status = + new HashMap(); + status.put(nodes.get(0), LogAggregationState.IN_PROGRESS); + for (int i=1; i < nodes.size(); i++) { + status.put(nodes.get(i), LogAggregationState.COMPLETED); + } + rmNodeKickOffLogAggregationUpdate(status); + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.IN_PROGRESS); + } + + @Test (timeout = 5000) + public void testLogAggregationCompletedState() throws Exception{ + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Map status = + new HashMap(); + for (NodeId id : nodes) { + status.put(id, LogAggregationState.COMPLETED); + } + rmNodeKickOffLogAggregationUpdate(status); + int times = 0; + while (times <= max_wait_time + && app.getLogAggregationState() != LogAggregationState.COMPLETED) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.COMPLETED); + } + + @Test (timeout = 5000) + public void testLogAggregationFailedState() { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createRunningApp(null, conf); + moveToFinishedState(app); + Map status = + new HashMap(); + status.put(nodes.get(0), LogAggregationState.FAILED); + for (int i=1; i < nodes.size(); i++) { + status.put(nodes.get(i), LogAggregationState.COMPLETED); + } + rmNodeKickOffLogAggregationUpdate(status); + int times = 0; + while (times <= max_wait_time + && app.getLogAggregationState() != LogAggregationState.FAILED) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + Assert.assertEquals(app.getLogAggregationState(), + LogAggregationState.FAILED); + } + private void rmNodeKickOffLogAggregationUpdate(Map status) { + for (Entry entry : status.entrySet()) { + RMNodeImpl node = + new RMNodeImpl(entry.getKey(), rmContext, null, 0, 0, null, null, null); + node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + NodeHealthStatus healthStatus = + NodeHealthStatus.newInstance(true, null, 1000); + NodeHeartbeatResponse lastResponse = + Records.newRecord(NodeHeartbeatResponse.class); + ApplicationLogAggregationStatus state = + ApplicationLogAggregationStatus.newInstance(applicationId, + entry.getValue(), null, "Test"); + RMNodeStatusEvent event = + new RMNodeStatusEvent(node.getNodeID(), healthStatus, + new ArrayList(), null, lastResponse, + Collections.singletonList(state)); + node.handle(event); + } + } + + @SuppressWarnings("unchecked") + private RMApp createRunningApp( + ApplicationSubmissionContext submissionContext, Configuration conf) { + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + // ensure max application attempts set to known value + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts); + scheduler = mock(YarnScheduler.class); + + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + + if (submissionContext == null) { + submissionContext = new ApplicationSubmissionContextPBImpl(); + } + // applicationId will not be used because RMStateStore is mocked, + // but applicationId is still set for safety + submissionContext.setApplicationId(applicationId); + + RMApp application = + new RMAppImpl(applicationId, rmContext, conf, name, user, queue, + submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN"); + this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), + application); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.START); + application.handle(event); + assertAppState(RMAppState.NEW_SAVING, application); + event = + new RMAppNewSavedEvent(application.getApplicationId(), null); + application.handle(event); + assertAppState(RMAppState.SUBMITTED, application); + RMAppAttempt currentAttempt = application.getCurrentAppAttempt(); + int times = 0; + while (currentAttempt.getAppAttemptState() + != RMAppAttemptState.SUBMITTED && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + Assert.assertTrue(currentAttempt. + getAppAttemptState() == RMAppAttemptState.SUBMITTED); + currentAttempt.handle( + new RMAppAttemptEvent(currentAttempt.getAppAttemptId(), + RMAppAttemptEventType.APP_ACCEPTED)); + assertAppState(RMAppState.ACCEPTED, application); + amContainer = createAMContainer(currentAttempt); + Allocation allocation = mock(Allocation.class); + when(allocation.getContainers()).thenReturn( + Collections.singletonList(amContainer)); + when(allocation.getContainers()). + thenReturn(Collections.singletonList(amContainer)); + when( + scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), + any(List.class), any(List.class), any(List.class))).thenReturn( + allocation); + currentAttempt.handle( + new RMAppAttemptContainerAllocatedEvent(currentAttempt + .getAppAttemptId(), amContainer)); + currentAttempt + .handle(createRMAppAttemptContainerAcquiredEvent(currentAttempt + .getAppAttemptId())); + currentAttempt + .handle(createRMAppAttemptContainerAcquiredEvent(currentAttempt + .getAppAttemptId())); + currentAttempt + .handle(createRMAppAttemptContainerAcquiredEvent(currentAttempt + .getAppAttemptId())); + currentAttempt + .handle(createRMAppAttemptContainerAcquiredEvent(currentAttempt + .getAppAttemptId())); + currentAttempt + .handle( + new RMAppAttemptNewSavedEvent(currentAttempt + .getAppAttemptId(), null)); + currentAttempt.handle( + new RMAppAttemptEvent(currentAttempt + .getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); + currentAttempt.handle( + new RMAppAttemptRegistrationEvent(currentAttempt + .getAppAttemptId(), null, 0, null)); + + times = 0; + while (application.getState() != RMAppState.RUNNING + && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + assertAppState(RMAppState.RUNNING, application); + Assert.assertTrue(currentAttempt.getAppAttemptState() == + RMAppAttemptState.RUNNING); + return application; + } + + private static void assertAppState(RMAppState state, RMApp application) { + Assert.assertEquals("application state should have been " + state, + state, application.getState()); + } + + private RMApp moveToFinishedState(RMApp application) { + RMAppAttempt current = application.getCurrentAppAttempt(); + current.handle(new RMAppAttemptUnregistrationEvent(current + .getAppAttemptId(), null, FinalApplicationStatus.SUCCEEDED, "")); + ContainerStatus status = + ContainerStatus.newInstance(amContainer.getId(), + ContainerState.COMPLETE, "", 0); + current.handle(new RMAppAttemptContainerFinishedEvent(current + .getAppAttemptId(), status)); + current.handle(new RMAppAttemptUpdateSavedEvent( + current.getAppAttemptId(), null)); + application.handle(new RMAppUpdateSavedEvent( + application.getApplicationId(), null)); + int times = 0; + while (application.getState() != RMAppState.FINISHED + && times <= max_wait_time) { + try { + Thread.sleep(50); + times++; + } catch (InterruptedException e) { + // Do nothing + } + } + assertAppState(RMAppState.FINISHED, application); + return application; + } + + private Container createAMContainer(RMAppAttempt currentAttempt) { + Container amContainer = mock(Container.class); + Resource resource = BuilderUtils.newResource(2048, 1); + when(amContainer.getId()).thenReturn( + BuilderUtils.newContainerId(currentAttempt + .getAppAttemptId(), container_id ++)); + when(amContainer.getResource()).thenReturn(resource); + return amContainer; + } + + private RMAppAttemptContainerAcquiredEvent + createRMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId) { + Container mockContainer = mock(Container.class); + NodeId nodeId = NodeId.newInstance("host_"+(host_id ++), 19); + nodes.add(nodeId); + when(mockContainer.getNodeId()).thenReturn(nodeId); + RMAppAttemptContainerAcquiredEvent event = + new RMAppAttemptContainerAcquiredEvent(appAttemptId, mockContainer); + return event; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 82046c7..f097471 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -465,7 +465,7 @@ private RMNodeImpl getUnhealthyNode() { NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(node.getNodeID(), status, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 73b9cf7..5344c2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -153,6 +154,11 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public LogAggregationState getLogAggregationState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcb2f6f..4f39464 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -226,4 +227,9 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { return null; } + + @Override + public LogAggregationState getLogAggregationState() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index b26c37f..9f806bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -149,7 +149,7 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false, "test health report", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY); ClientResponse response =