diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 03552e4..fc825cf 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -90,7 +91,7 @@ private ApplicationReport getUnknownApplicationReport() { return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + YarnConfiguration.DEFAULT_APPLICATION_TYPE, LogAggregationState.DISABLED, null); } NotRunningJob(ApplicationReport applicationReport, JobState jobState) { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 7b47711..abbb92e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -470,9 +471,10 @@ private ApplicationReport getFinishedApplicationReport() { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); } private ApplicationReport getRunningApplicationReport(String host, int port) { @@ -480,9 +482,11 @@ private ApplicationReport getRunningApplicationReport(String host, int port) { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( appId, 0); return ApplicationReport.newInstance(appId, attemptId, "user", "queue", - "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics", - "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, - YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", host, port, null, YarnApplicationState.RUNNING, + "diagnostics", + "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, + YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); } private ResourceMgrDelegate getRMDelegate() throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index 0854fdb..d49a3ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -40,6 +40,7 @@ *
  • {@link YarnApplicationState} of the application.
  • *
  • Diagnostic information in case of errors.
  • *
  • Start time of the application.
  • + *
  • {@link LogAggregationState} of the application.
  • *
  • Client {@link Token} of the application (if security is enabled).
  • * *

    @@ -58,7 +59,8 @@ public static ApplicationReport newInstance(ApplicationId applicationId, YarnApplicationState state, String diagnostics, String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String applicationType, Token amRmToken) { + float progress, String applicationType, + LogAggregationState logAggregationState, Token amRmToken) { ApplicationReport report = Records.newRecord(ApplicationReport.class); report.setApplicationId(applicationId); report.setCurrentApplicationAttemptId(applicationAttemptId); @@ -78,6 +80,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId, report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(applicationType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } @@ -349,4 +352,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId, @Stable public abstract Token getAMRMToken(); + /** + * Get the application's log aggregation state + * @return application's log aggregation state + */ + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Stable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java new file mode 100644 index 0000000..0a6746b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + * Enumeration of various states of log aggregation. + */ + +@Public +@Stable +public enum LogAggregationState { + /**The log aggregation is disabled. */ + DISABLED, + + /** The log aggregation is not started. */ + NOT_STARTED, + + /** The log aggregation is in progress. */ + IN_PROGRESS, + + /** The log aggregation is failed. */ + FAILED, + + /** The log aggregation is finished. */ + FINISHED, + + /** The time which waits for log aggregation to finish + * exceeds the maximum wait time. + */ + TIME_OUT +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..9bd567e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -495,13 +495,20 @@ + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - /** + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX + "log-aggregation.retain-seconds"; public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; + + /** + * How long to wait for log aggregation to finish. + */ + public static final String LOG_AGGREGATION_MAX_WAIT_SECONDS = YARN_PREFIX + + "log-aggregation.max-wait.seconds"; + public static final long DEFAULT_LOG_AGGREGATION_MAX_WAIT_SECONDS = 10 * 60; /** * How long to wait between aggregated log retention checks. If set to diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9d4d59e..f5ad4a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -156,7 +156,8 @@ message ApplicationReportProto { optional ApplicationAttemptIdProto currentApplicationAttemptId = 16; optional float progress = 17; optional string applicationType = 18; - optional hadoop.common.TokenProto am_rm_token = 19; + optional LogAggregationStateProto logAggregationState = 19; + optional hadoop.common.TokenProto am_rm_token = 20; } enum NodeStateProto { @@ -185,7 +186,14 @@ message NodeReportProto { optional int64 last_health_report_time = 9; } - +enum LogAggregationStateProto { + LOG_AGGREGATION_DISABLED = 0; + LOG_AGGREGATION_NOT_STARTED = 1; + LOG_AGGREGATION_IN_PROGRESS = 2; + LOG_AGGREGATION_FAILED = 3; + LOG_AGGREGATION_FINISHED = 4; + LOG_AGGREGATION_TIME_OUT = 5; +} //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a7b7d65..e99a5e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -278,6 +278,8 @@ private void printApplicationReport(String applicationId) appReportStr.println(appReport.getYarnApplicationState()); appReportStr.print("\tFinal-State : "); appReportStr.println(appReport.getFinalApplicationStatus()); + appReportStr.print("\tLog-Aggregation-State : "); + appReportStr.println(appReport.getLogAggregationState()); appReportStr.print("\tTracking-URL : "); appReportStr.println(appReport.getOriginalTrackingUrl()); appReportStr.print("\tRPC Port : "); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 826433d..ba8da09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -271,7 +272,8 @@ public void setYarnApplicationState(YarnApplicationState state) { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -282,7 +284,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + LogAggregationState.FINISHED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -291,7 +293,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_STARTED, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -302,7 +304,7 @@ public void setYarnApplicationState(YarnApplicationState state) { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, - "NON-MAPREDUCE", null); + "NON-MAPREDUCE", LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); return applicationReports; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 1d08f24..fb54f18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -84,7 +85,8 @@ public void testGetApplicationReport() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.DISABLED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); int result = cli.run(new String[] { "-status", applicationId.toString() }); @@ -103,6 +105,7 @@ public void testGetApplicationReport() throws Exception { pw.println("\tProgress : 53.79%"); pw.println("\tState : FINISHED"); pw.println("\tFinal-State : SUCCEEDED"); + pw.println("\tLog-Aggregation-State : DISABLED"); pw.println("\tTracking-URL : N/A"); pw.println("\tRPC Port : 124"); pw.println("\tAM Host : host"); @@ -138,7 +141,8 @@ public void testGetApplications() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); List applicationReports = new ArrayList(); applicationReports.add(newApplicationReport); @@ -147,8 +151,8 @@ public void testGetApplications() throws Exception { applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2), "user2", "queue2", "appname2", "host2", 125, null, YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", - null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", + LogAggregationState.FINISHED, null); applicationReports.add(newApplicationReport2); ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); @@ -157,7 +161,7 @@ public void testGetApplications() throws Exception { "user3", "queue3", "appname3", "host3", 126, null, YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", - null); + LogAggregationState.NOT_STARTED, null); applicationReports.add(newApplicationReport3); ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); @@ -166,7 +170,7 @@ public void testGetApplications() throws Exception { "user4", "queue4", "appname4", "host4", 127, null, YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, "NON-MAPREDUCE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport4); ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9); @@ -175,7 +179,7 @@ public void testGetApplications() throws Exception { "user5", "queue5", "appname5", "host5", 128, null, YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5, FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE", - null); + LogAggregationState.FAILED, null); applicationReports.add(newApplicationReport5); ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10); @@ -184,7 +188,7 @@ public void testGetApplications() throws Exception { "user6", "queue6", "appname6", "host6", 129, null, YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6, FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG", - null); + LogAggregationState.IN_PROGRESS, null); applicationReports.add(newApplicationReport6); // Test command yarn application -list @@ -546,7 +550,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.FINISHED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport2); int result = cli.run(new String[] { "-kill", applicationId.toString() }); @@ -559,7 +564,8 @@ public void testKillApplication() throws Exception { applicationId, ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", "appname", "host", 124, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + LogAggregationState.NOT_STARTED, null); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); result = cli.run(new String[] { "-kill", applicationId.toString() }); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index 9716f74..b65e18f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -516,4 +518,34 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + + @Override + public LogAggregationState getLogAggregationState() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public void setLogAggregationState( + LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index b660f7d..6a9cb34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto; @@ -206,4 +208,15 @@ public static ApplicationAccessType convertFromProtoFormat( return ApplicationAccessType.valueOf(e.name().replace( APP_ACCESS_TYPE_PREFIX, "")); } + + /* + * LogAggregationState + */ + private static String LOG_AGGREGATION_PREFIX = "LOG_AGGREGATION_"; + public static LogAggregationStateProto convertToProtoFormat(LogAggregationState e) { + return LogAggregationStateProto.valueOf(LOG_AGGREGATION_PREFIX + e.name()); + } + public static LogAggregationState convertFromProtoFormat(LogAggregationStateProto e) { + return LogAggregationState.valueOf(e.name().replace(LOG_AGGREGATION_PREFIX, "")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java index 9302d4b..b0e7aa4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assert; @@ -56,9 +57,10 @@ protected static ApplicationReport createApplicationReport( ApplicationAttemptId.newInstance(appId, appAttemptIdInt); ApplicationReport appReport = ApplicationReport.newInstance(appId, appAttemptId, "user", "queue", - "appname", "host", 124, null, YarnApplicationState.FINISHED, - "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null); + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, + LogAggregationState.DISABLED, null); return appReport; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java new file mode 100644 index 0000000..13eeb23 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.util.Records; + +/** + *

    ApplicationLogStatus is a data structure of the + * log aggregation state for the Application.

    + * + *

    It includes information such as: + *

      + *
    • ApplicationId
    • + *
    • LogAggregation State of this application
    • + *
    • Set of LogAggregationState for all containers
    • + *
    • Diagnostic information
    • + *
    + *

    + * + * @see NodeStatus + */ + +public abstract class ApplicationLogAggregationStatus { + @Private + public static ApplicationLogAggregationStatus newInstance(ApplicationId appId, + LogAggregationState appLogState, + Map containersLogStatus, String diagnostic) { + ApplicationLogAggregationStatus appLogStatus = + Records.newRecord(ApplicationLogAggregationStatus.class); + appLogStatus.setApplicationId(appId); + appLogStatus.setLogAggregationState(appLogState); + appLogStatus.setContainersLogStatuses(containersLogStatus); + appLogStatus.setDiagnostic(diagnostic); + return appLogStatus; + } + + @Public + @Stable + public abstract ApplicationId getApplicationId(); + + @Private + @Unstable + public abstract void setApplicationId(ApplicationId applicationId); + + @Public + @Stable + public abstract LogAggregationState getLogAggregationState(); + + @Private + @Unstable + public abstract void setLogAggregationState( + LogAggregationState logAggregationState); + + @Public + @Stable + public abstract Map getContainersLogStatuses(); + + @Private + @Unstable + public abstract void setContainersLogStatuses( + Map containersLogStatus); + + @Public + @Stable + public abstract String getDiagnostic(); + + @Private + @Unstable + public abstract void setDiagnostic(String diagnostic); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 8e98703..c817264 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -41,4 +41,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract List + getApplicationLogAggregationStatuses(); + public abstract void setApplicationLogAggregationStatuses( + List applicationLogAggregationStatuses); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java new file mode 100644 index 0000000..6239dd8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerIdLogAggregateStateMapProto; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; + +import com.google.protobuf.TextFormat; + +public class ApplicationLogAggregationStatusPBImpl extends ApplicationLogAggregationStatus{ + + ApplicationLogAggregationStatusProto proto = + ApplicationLogAggregationStatusProto.getDefaultInstance(); + ApplicationLogAggregationStatusProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map containersLogStatus = null; + + public ApplicationLogAggregationStatusPBImpl() { + builder = ApplicationLogAggregationStatusProto.newBuilder(); + } + + public ApplicationLogAggregationStatusPBImpl(ApplicationLogAggregationStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized ApplicationLogAggregationStatusProto getProto() { + mergeLocalToProto(); + this.proto = this.viaProto ? this.proto : this.builder.build(); + this.viaProto = true; + return this.proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private synchronized void mergeLocalToProto() { + if (this.viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + this.proto = this.builder.build(); + this.viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (this.viaProto || this.builder == null) { + this.builder = ApplicationLogAggregationStatusProto.newBuilder(this.proto); + } + this.viaProto = false; + } + + private synchronized void mergeLocalToBuilder() { + if (this.applicationId != null + && !((ApplicationIdPBImpl) this.applicationId).getProto().equals( + builder.getApplicationId())) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.containersLogStatus != null) { + addContainersLogStatusToProto(); + } + } + + private synchronized void addContainersLogStatusToProto() { + maybeInitBuilder(); + builder.clearContainersLogStatus(); + if (this.containersLogStatus == null) { + return; + } + Iterable values = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator contaierIdIterator = containersLogStatus + .keySet().iterator(); + + @Override + public boolean hasNext() { + return contaierIdIterator.hasNext(); + } + + @Override + public ContainerIdLogAggregateStateMapProto next() { + ContainerId key = contaierIdIterator.next(); + return ContainerIdLogAggregateStateMapProto.newBuilder() + .setLogAggregationState( + convertToProtoFormat(containersLogStatus.get(key))) + .setContainerId( + convertToProtoFormat(key)).build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllContainersLogStatus(values); + } + + private ContainerIdProto convertToProtoFormat(ContainerId containerId) { + return ((ContainerIdPBImpl) containerId).getProto(); + } + + private ContainerId convertFromProtoFormat(ContainerIdProto proto) { + return new ContainerIdPBImpl(proto); + } + + @Override + public synchronized ApplicationId getApplicationId() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) { + return new ApplicationIdPBImpl(proto); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId appId) { + return ((ApplicationIdPBImpl)appId).getProto(); + } + + @Override + public synchronized void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + @Override + public synchronized LogAggregationState getLogAggregationState() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasLogAggregationState()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationState()); + } + + private LogAggregationState convertFromProtoFormat( + LogAggregationStateProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + @Override + public synchronized void setLogAggregationState( + LogAggregationState logAggregationState) { + maybeInitBuilder(); + if (logAggregationState == null) { + builder.clearLogAggregationState(); + return; + } + builder.setLogAggregationState(convertToProtoFormat(logAggregationState)); + } + + private LogAggregationStateProto convertToProtoFormat( + LogAggregationState logAggregationState) { + return ProtoUtils.convertToProtoFormat(logAggregationState); + } + + @Override + public synchronized Map + getContainersLogStatuses() { + initContainersLogStatus(); + return this.containersLogStatus; + } + + private synchronized void initContainersLogStatus() { + if (this.containersLogStatus != null) { + return; + } + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + List list = + p.getContainersLogStatusList(); + this.containersLogStatus = + new HashMap(list.size()); + for (ContainerIdLogAggregateStateMapProto pro : list) { + containersLogStatus.put(convertFromProtoFormat(pro.getContainerId()), + convertFromProtoFormat(pro.getLogAggregationState())); + } + } + + @Override + public synchronized void setContainersLogStatuses( + Map containersLogStatus) { + maybeInitBuilder(); + if (this.containersLogStatus == null) { + builder.clearContainersLogStatus(); + } + this.containersLogStatus = containersLogStatus; + } + + @Override + public synchronized String getDiagnostic() { + ApplicationLogAggregationStatusProtoOrBuilder p = + viaProto ? proto : builder; + if (! p.hasDiagnostic()) { + return null; + } + return p.getDiagnostic(); + } + + @Override + public synchronized void setDiagnostic(String diagnostic) { + maybeInitBuilder(); + if (diagnostic == null) { + builder.clearDiagnostic(); + return; + } + this.builder.setDiagnostic(diagnostic); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8ed7849..9e3f9ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -33,9 +33,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -50,6 +52,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List applicationLogStatus = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -80,6 +83,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.applicationLogStatus != null) { + addApplicationLogStatusToProto(); + } } private synchronized void mergeLocalToProto() { @@ -166,6 +172,42 @@ public void remove() { builder.addAllKeepAliveApplications(iterable); } + private synchronized void addApplicationLogStatusToProto() { + maybeInitBuilder(); + builder.clearApplicationLogStatus(); + if (applicationLogStatus == null) + { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = + applicationLogStatus.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationLogAggregationStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllApplicationLogStatus(iterable); + } + @Override public synchronized int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -255,7 +297,20 @@ private synchronized void initKeepAliveApplications() { } } - + + private synchronized void initApplicationLogStatus() { + if (this.applicationLogStatus != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getApplicationLogStatusList(); + this.applicationLogStatus = new ArrayList(); + + for (ApplicationLogAggregationStatusProto c : list) { + this.applicationLogStatus.add(convertFromProtoFormat(c)); + } + } + @Override public synchronized NodeHealthStatus getNodeHealthStatus() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -310,4 +365,31 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } + + private ApplicationLogAggregationStatusProto convertToProtoFormat( + ApplicationLogAggregationStatus c) { + return ((ApplicationLogAggregationStatusPBImpl)c).getProto(); + } + + private ApplicationLogAggregationStatus convertFromProtoFormat( + ApplicationLogAggregationStatusProto c) { + return new ApplicationLogAggregationStatusPBImpl(c); + } + + @Override + public synchronized List + getApplicationLogAggregationStatuses() { + initApplicationLogStatus(); + return this.applicationLogStatus; + } + + @Override + public synchronized void setApplicationLogAggregationStatuses( + List applicationLogStatuses) { + maybeInitBuilder(); + if (applicationLogStatuses == null) { + builder.clearApplicationLogStatus(); + } + this.applicationLogStatus = applicationLogStatuses; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 1b2a03e..00d9488 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -312,7 +313,8 @@ public static ApplicationReport newApplicationReport( String url, long startTime, long finishTime, FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, String origTrackingUrl, - float progress, String appType, Token amRmToken) { + float progress, String appType, LogAggregationState logAggregationState, + Token amRmToken) { ApplicationReport report = recordFactory .newRecordInstance(ApplicationReport.class); report.setApplicationId(applicationId); @@ -333,6 +335,7 @@ public static ApplicationReport newApplicationReport( report.setOriginalTrackingUrl(origTrackingUrl); report.setProgress(progress); report.setApplicationType(appType); + report.setLogAggregationState(logAggregationState); report.setAMRMToken(amRmToken); return report; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..7d1dc0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -30,12 +30,25 @@ enum NodeActionProto { SHUTDOWN = 2; } +message ContainerIdLogAggregateStateMapProto { + optional ContainerIdProto container_id = 1; + optional LogAggregationStateProto logAggregationState = 2; +} + +message ApplicationLogAggregationStatusProto { + optional ApplicationIdProto application_id = 1; + optional LogAggregationStateProto logAggregationState = 2; + repeated ContainerIdLogAggregateStateMapProto containersLogStatus = 3; + optional string diagnostic = 4; +} + message NodeStatusProto { optional NodeIdProto node_id = 1; optional int32 response_id = 2; repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ApplicationLogAggregationStatusProto applicationLogStatus = 6; } message MasterKeyProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 23f8754..db57f61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -339,7 +339,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse, + remoteNodeStatus.getApplicationLogAggregationStatuses())); return nodeHeartBeatResponse; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..2efdb2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -211,4 +212,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Return the log aggregation State + * @return the log aggregation State of the RMApp + */ + LogAggregationState getLogAggregationState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index a2fa0e2..fba5e49 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -37,5 +37,8 @@ // Source: RMStateStore APP_NEW_SAVED, APP_UPDATE_SAVED, - APP_REMOVED + APP_REMOVED, + + // Source: RMNode + APP_LOG_AGGREGATION_STATUS_UPDATE } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..1e42994 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -97,12 +100,18 @@ = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); + private final Map appLogAggregationCompleted = + new HashMap(); + private final Map appLogAggregationFailed = + new HashMap(); private final String applicationType; + private final long logAggregationTimeOut; // Mutable fields private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private int expectedNumOfAppLogStatus = 0; private RMAppAttempt currentAttempt; private String queue; @SuppressWarnings("rawtypes") @@ -234,11 +243,17 @@ RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.KILL)) + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE, + new AppLogAggregationStatusUpdateTransition()) // Transitions from FAILED state // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE, + new AppLogAggregationStatusUpdateTransition()) // Transitions from KILLED state // ignorable transitions @@ -249,7 +264,9 @@ RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) - + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE, + new AppLogAggregationStatusUpdateTransition()) .installTopology(); private final StateMachine @@ -302,6 +319,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); + long timeout = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_MAX_WAIT_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_MAX_WAIT_SECONDS); + if (timeout <= 0) { + LOG.info("The value of " + + YarnConfiguration.DEFAULT_LOG_AGGREGATION_MAX_WAIT_SECONDS + + " is less than 0. Using the default value : " + + YarnConfiguration.DEFAULT_LOG_AGGREGATION_MAX_WAIT_SECONDS); + this.logAggregationTimeOut = + YarnConfiguration.DEFAULT_LOG_AGGREGATION_MAX_WAIT_SECONDS * 1000; + } else { + this.logAggregationTimeOut = timeout * 1000; + } } @Override @@ -511,7 +541,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, this.name, host, rpcPort, clientToAMToken, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, - appUsageReport, origTrackingUrl, progress, this.applicationType, + appUsageReport, origTrackingUrl, progress, this.applicationType, + getLogAggregationState(), amrmToken); } finally { this.readLock.unlock(); @@ -889,6 +920,15 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + private static class AppLogAggregationStatusUpdateTransition extends RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppLogAggregationStatusUpdateEvent updateEvent = + (RMAppLogAggregationStatusUpdateEvent) event; + app.updateLogAggregationStatus(updateEvent.getApplicationLogStatus(), + updateEvent.getNodeId()); + } + } private static class AppKilledTransition extends FinalTransition { @Override @@ -911,6 +951,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { super.transition(app, event); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -933,6 +974,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { @SuppressWarnings("unchecked") public void transition(RMAppImpl app, RMAppEvent event) { Set nodes = getNodesOnWhichAttemptRan(app); + app.expectedNumOfAppLogStatus = nodes.size(); for (NodeId nodeId : nodes) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); @@ -1017,4 +1059,46 @@ public YarnApplicationState createApplicationState() { throw new YarnRuntimeException("Unknown state passed!"); } } + + private void updateLogAggregationStatus( + ApplicationLogAggregationStatus status, NodeId nodeId) { + if (status.getLogAggregationState() == LogAggregationState.FINISHED) { + appLogAggregationCompleted.put(nodeId, status); + } else if (status.getLogAggregationState() == LogAggregationState.FAILED) { + appLogAggregationFailed.put(nodeId, status); + } + if (status.getDiagnostic() != null && !status.getDiagnostic().isEmpty()) { + this.diagnostics.append(status.getDiagnostic() + "\n"); + } + } + + @Override + public LogAggregationState getLogAggregationState() { + this.readLock.lock(); + try { + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + return LogAggregationState.DISABLED; + } + RMAppState currentState = getState(); + if (currentState != RMAppState.FINISHED + && currentState != RMAppState.KILLED + && currentState != RMAppState.FAILED) { + return LogAggregationState.NOT_STARTED; + } + if (appLogAggregationFailed.size() > 0 || expectedNumOfAppLogStatus == 0) { + return LogAggregationState.FAILED; + } else if (appLogAggregationCompleted.size() == expectedNumOfAppLogStatus) { + return LogAggregationState.FINISHED; + } else { + if (System.currentTimeMillis() - this.finishTime + <= this.logAggregationTimeOut) { + return LogAggregationState.IN_PROGRESS; + } + return LogAggregationState.TIME_OUT; + } + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java new file mode 100644 index 0000000..135332b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; + +public class RMAppLogAggregationStatusUpdateEvent extends RMAppEvent { + + private final NodeId nodeId; + private final ApplicationLogAggregationStatus applicationLogStatus; + + public RMAppLogAggregationStatusUpdateEvent(ApplicationId appId, + NodeId nodeId, ApplicationLogAggregationStatus applicationLogStatus) { + super(appId, RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE); + this.nodeId = nodeId; + this.applicationLogStatus = applicationLogStatus; + } + + public NodeId getNodeId() { + return this.nodeId; + }; + + public ApplicationLogAggregationStatus getApplicationLogStatus() { + return this.applicationLogStatus; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 52bc285..d37e000 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -48,11 +48,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLogAggregationStatusUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -648,6 +651,11 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { statusEvent.getKeepAliveAppIds()); } + if (statusEvent.getApplicationLogAggregationStatus() != null + && !statusEvent.getApplicationLogAggregationStatus().isEmpty()) { + rmNode.sendApplicationLogStatus(statusEvent + .getApplicationLogAggregationStatus()); + } return NodeState.RUNNING; } } @@ -694,6 +702,20 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return latestContainerInfoList; } + private void sendApplicationLogStatus( + List applicationLogStatus) { + for(ApplicationLogAggregationStatus status : applicationLogStatus) { + RMApp app = + context.getRMApps().get( + status.getApplicationId()); + if (app != null) { + this.context.getDispatcher().getEventHandler().handle( + new RMAppLogAggregationStatusUpdateEvent( + status.getApplicationId(), this.nodeId, status)); + } + } + } + @VisibleForTesting public void setNextHeartBeat(boolean nextHeartBeat) { this.nextHeartBeat = nextHeartBeat; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index abfacbb..c8234de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; public class RMNodeStatusEvent extends RMNodeEvent { @@ -32,15 +33,18 @@ private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; + private final List applicationLogStatus; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, - NodeHeartbeatResponse latestResponse) { + NodeHeartbeatResponse latestResponse, + List applicationLogStatus) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; + this.applicationLogStatus = applicationLogStatus; } public NodeHealthStatus getNodeHealthStatus() { @@ -58,4 +62,9 @@ public NodeHeartbeatResponse getLatestResponse() { public List getKeepAliveAppIds() { return this.keepAliveAppIds; } + + public List + getApplicationLogAggregationStatus() { + return this.applicationLogStatus; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index c2cf147..a910455 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -174,4 +174,28 @@ public NodeHeartbeatResponse nodeHeartbeat(Map nms = new ArrayList(); + private MockRM rm1 = null; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + conf = new YarnConfiguration(); + } + + @After + public void teardown() throws Exception { + if (rm1 != null) { + rm1.serviceStop(); + } + } + + @Test + public void testLogAggregationDisabledState() throws Exception { + RMApp app = createFinishedRMApp(conf); + waitForLogAggregationState(app, LogAggregationState.DISABLED); + checkAppLogAggregationStateFromAppReport(conf, app, LogAggregationState.DISABLED); + } + + @Test + public void testLogAggregationInProgressState() throws Exception { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createFinishedRMApp(conf); + + for (int i=1; i < nms.size(); i++) { + nms.get(i).nodeHeartbeat(createNodeStatus(nms.get(i).getNodeId(), + app.getApplicationId(), LogAggregationState.FINISHED)); + } + waitForLogAggregationState(app, LogAggregationState.IN_PROGRESS); + checkAppLogAggregationStateFromAppReport(conf, app, LogAggregationState.IN_PROGRESS); + } + + @Test + public void testLogAggregationFinishedState() throws Exception { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createFinishedRMApp(conf); + + for (MockNM nm : nms) { + nm.nodeHeartbeat(createNodeStatus(nm.getNodeId(), + app.getApplicationId(), LogAggregationState.FINISHED)); + } + waitForLogAggregationState(app, LogAggregationState.FINISHED); + checkAppLogAggregationStateFromAppReport(conf, app, LogAggregationState.FINISHED); + } + + @Test + public void testLogAggregationFailedState() throws Exception{ + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + RMApp app = createFinishedRMApp(conf); + + nms.get(0).nodeHeartbeat(createNodeStatus(nms.get(0).getNodeId(), + app.getApplicationId(), LogAggregationState.FAILED)); + for (int i=1; i < nms.size(); i++) { + nms.get(i).nodeHeartbeat(createNodeStatus(nms.get(i).getNodeId(), + app.getApplicationId(), LogAggregationState.FINISHED)); + } + waitForLogAggregationState(app, LogAggregationState.FAILED); + checkAppLogAggregationStateFromAppReport(conf, app, LogAggregationState.FAILED); + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + for (MockNM mockNM : nms) { + attempt.handle(createRMAppAttemptContainerAcquiredEvent( + attempt.getAppAttemptId(), mockNM.getNodeId())); + } + return am; + } + + private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, + MockAM am, FinishApplicationMasterRequest req) throws Exception { + am.unregisterAppAttempt(req); + am.waitForState(RMAppAttemptState.FINISHING); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + } + + private void waitForLogAggregationState(RMApp rmApp, + LogAggregationState logAggregationState) + throws Exception { + int timeoutSecs = 0; + while (rmApp.getLogAggregationState() != logAggregationState + && timeoutSecs++ < 40) { + Thread.sleep(2000); + } + Assert.assertEquals("App state is not correct (timedout)", + rmApp.getLogAggregationState(), + logAggregationState); + } + + private List createNMs(MockRM rm1) throws Exception{ + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15121, rm1.getResourceTrackerService()); + nm1.registerNode(); + nms.add(nm1); + + MockNM nm2 = + new MockNM("127.0.0.1:2345", 15122, rm1.getResourceTrackerService()); + nm2.registerNode(); + nms.add(nm2); + + MockNM nm3 = + new MockNM("127.0.0.1:3456", 15123, rm1.getResourceTrackerService()); + nm3.registerNode(); + nms.add(nm3); + + MockNM nm4 = + new MockNM("127.0.0.1:4567", 15124, rm1.getResourceTrackerService()); + nm4.registerNode(); + nms.add(nm4); + + return nms; + } + + private RMAppAttemptContainerAcquiredEvent + createRMAppAttemptContainerAcquiredEvent( + ApplicationAttemptId appAttemptId, NodeId nodeId) { + Container mockContainer = mock(Container.class); + when(mockContainer.getNodeId()).thenReturn(nodeId); + RMAppAttemptContainerAcquiredEvent event = + new RMAppAttemptContainerAcquiredEvent(appAttemptId, mockContainer); + return event; + } + + private NodeHealthStatus createNodeHealthStatus() { + NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); + healthStatus.setHealthReport(""); + healthStatus.setIsNodeHealthy(true); + healthStatus.setLastHealthReportTime(1); + return healthStatus; + } + + private NodeStatus createNodeStatus(NodeId nodeId, + ApplicationId appId, LogAggregationState logAggregationState) { + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setNodeId(nodeId); + status.setNodeHealthStatus(createNodeHealthStatus()); + ApplicationLogAggregationStatus appStatus = + ApplicationLogAggregationStatus.newInstance(appId, + logAggregationState, null, ""); + status.setApplicationLogAggregationStatuses(Collections + .singletonList(appStatus)); + return status; + } + + private RMApp createFinishedRMApp(YarnConfiguration conf) throws Exception{ + + // start RM + rm1 = new MockRM(conf) { + protected ClientRMService createClientRMService() { + return new ClientRMService(this.rmContext, scheduler, + this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.rmDTSecretManager); + }; + }; + rm1.start(); + + createNMs(rm1); + + // create an app and finish the app. + RMApp app = rm1.submitApp(200); + MockAM am = launchAM(app, rm1, nms.get(0)); + + // unregister am + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "diagnostics", "trackingUrl"); + finishApplicationMaster(app, rm1, nms.get(0), am, req); + return app; + } + + private void checkAppLogAggregationStateFromAppReport(YarnConfiguration conf, + RMApp app, + LogAggregationState logAggregationState) throws Exception { + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress rmAddress = rm1.getClientRMService().getBindAddress(); + ApplicationClientProtocol client = + (ApplicationClientProtocol) rpc + .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + GetApplicationReportRequest request = + GetApplicationReportRequest.newInstance(app.getApplicationId()); + GetApplicationReportResponse appReportResponse = + client.getApplicationReport(request); + Assert.assertEquals(appReportResponse.getApplicationReport() + .getLogAggregationState(), logAggregationState); + rpc.stopProxy(client, conf); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 82046c7..f097471 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -465,7 +465,7 @@ private RMNodeImpl getUnhealthyNode() { NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(node.getNodeID(), status, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 73b9cf7..5344c2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -153,6 +154,11 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public LogAggregationState getLogAggregationState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcb2f6f..4f39464 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -226,4 +227,9 @@ public boolean isAppSafeToUnregister() { public YarnApplicationState createApplicationState() { return null; } + + @Override + public LogAggregationState getLogAggregationState() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index b26c37f..9f806bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -149,7 +149,7 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false, "test health report", System.currentTimeMillis()); node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth, - new ArrayList(), null, null)); + new ArrayList(), null, null, null)); rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY); ClientResponse response =