diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
index 03552e4..1b26cd3 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
@@ -90,7 +90,7 @@ private ApplicationReport getUnknownApplicationReport() {
return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE, null, null);
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
index 7b47711..38a1b80 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -470,9 +471,10 @@ private ApplicationReport getFinishedApplicationReport() {
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
- "appname", "host", 124, null, YarnApplicationState.FINISHED,
- "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", "host", 124, null, YarnApplicationState.FINISHED,
+ "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
+ "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLE, null);
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -480,9 +482,11 @@ private ApplicationReport getRunningApplicationReport(String host, int port) {
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
- "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
- "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", host, port, null, YarnApplicationState.RUNNING,
+ "diagnostics",
+ "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLE, null);
}
private ResourceMgrDelegate getRMDelegate() throws IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
index 0854fdb..d49a3ef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
@@ -40,6 +40,7 @@
*
{@link YarnApplicationState} of the application.
* Diagnostic information in case of errors.
* Start time of the application.
+ * {@link LogAggregationState} of the application.
* Client {@link Token} of the application (if security is enabled).
*
*
@@ -58,7 +59,8 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
YarnApplicationState state, String diagnostics, String url,
long startTime, long finishTime, FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String applicationType, Token amRmToken) {
+ float progress, String applicationType,
+ LogAggregationState logAggregationState, Token amRmToken) {
ApplicationReport report = Records.newRecord(ApplicationReport.class);
report.setApplicationId(applicationId);
report.setCurrentApplicationAttemptId(applicationAttemptId);
@@ -78,6 +80,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
report.setOriginalTrackingUrl(origTrackingUrl);
report.setProgress(progress);
report.setApplicationType(applicationType);
+ report.setLogAggregationState(logAggregationState);
report.setAMRMToken(amRmToken);
return report;
}
@@ -349,4 +352,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
@Stable
public abstract Token getAMRMToken();
+ /**
+ * Get the application's log aggregation state
+ * @return application's log aggregation state
+ */
+ @Public
+ @Stable
+ public abstract LogAggregationState getLogAggregationState();
+
+ @Private
+ @Stable
+ public abstract void setLogAggregationState(
+ LogAggregationState logAggregationState);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java
new file mode 100644
index 0000000..b94c61e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+
+/**
+ * Enumeration of various states of log aggregation.
+ */
+
+@Public
+@Stable
+public enum LogAggregationState {
+ /**The log aggregation is disabled. */
+ DISABLE,
+
+ /** The log aggregation is not started. */
+ NOT_START,
+
+ /** The log aggregation is in progress. */
+ IN_PROGRESS,
+
+ /** The log aggregation is failed. */
+ FAILED,
+
+ /** The log aggregation is finished. */
+ COMPLETED
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3192306..adcde9e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -151,7 +151,8 @@ message ApplicationReportProto {
optional ApplicationAttemptIdProto currentApplicationAttemptId = 16;
optional float progress = 17;
optional string applicationType = 18;
- optional hadoop.common.TokenProto am_rm_token = 19;
+ optional LogAggregationStateProto logAggregationState = 19;
+ optional hadoop.common.TokenProto am_rm_token = 20;
}
enum NodeStateProto {
@@ -180,7 +181,13 @@ message NodeReportProto {
optional int64 last_health_report_time = 9;
}
-
+enum LogAggregationStateProto {
+ LOG_AGGREGATION_DISABLE = 0;
+ LOG_AGGREGATION_NOT_START = 1;
+ LOG_AGGREGATION_IN_PROGRESS = 2;
+ LOG_AGGREGATION_FAILED = 3;
+ LOG_AGGREGATION_COMPLETED = 4;
+}
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index 155ba5d..6ec4644 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index a7b7d65..37c5f3e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -278,6 +278,8 @@ private void printApplicationReport(String applicationId)
appReportStr.println(appReport.getYarnApplicationState());
appReportStr.print("\tFinal-State : ");
appReportStr.println(appReport.getFinalApplicationStatus());
+ appReportStr.print("\tLOG-AGGREGATION-State : ");
+ appReportStr.println(appReport.getLogAggregationState());
appReportStr.print("\tTracking-URL : ");
appReportStr.println(appReport.getOriginalTrackingUrl());
appReportStr.print("\tRPC Port : ");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 826433d..3fd1eba 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -271,7 +272,8 @@ public void setYarnApplicationState(YarnApplicationState state) {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_START, null);
List applicationReports =
new ArrayList();
applicationReports.add(newApplicationReport);
@@ -282,7 +284,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user2", "queue2", "appname2", "host2", 125, null,
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
- null);
+ LogAggregationState.COMPLETED, null);
applicationReports.add(newApplicationReport2);
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
@@ -291,7 +293,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user3", "queue3", "appname3", "host3", 126, null,
YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
- null);
+ LogAggregationState.NOT_START, null);
applicationReports.add(newApplicationReport3);
ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8);
@@ -302,7 +304,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user4", "queue4", "appname4", "host4", 127, null,
YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f,
- "NON-MAPREDUCE", null);
+ "NON-MAPREDUCE", LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport4);
return applicationReports;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index 1d08f24..5bdd0f7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -84,7 +85,8 @@ public void testGetApplicationReport() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.DISABLE, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
int result = cli.run(new String[] { "-status", applicationId.toString() });
@@ -103,6 +105,7 @@ public void testGetApplicationReport() throws Exception {
pw.println("\tProgress : 53.79%");
pw.println("\tState : FINISHED");
pw.println("\tFinal-State : SUCCEEDED");
+ pw.println("\tLOG-AGGREGATION-State : DISABLE");
pw.println("\tTracking-URL : N/A");
pw.println("\tRPC Port : 124");
pw.println("\tAM Host : host");
@@ -138,7 +141,8 @@ public void testGetApplications() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_START, null);
List applicationReports = new ArrayList();
applicationReports.add(newApplicationReport);
@@ -147,8 +151,8 @@ public void testGetApplications() throws Exception {
applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2),
"user2", "queue2", "appname2", "host2", 125, null,
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
- null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
+ LogAggregationState.COMPLETED, null);
applicationReports.add(newApplicationReport2);
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
@@ -157,7 +161,7 @@ public void testGetApplications() throws Exception {
"user3", "queue3", "appname3", "host3", 126, null,
YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
- null);
+ LogAggregationState.NOT_START, null);
applicationReports.add(newApplicationReport3);
ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8);
@@ -166,7 +170,7 @@ public void testGetApplications() throws Exception {
"user4", "queue4", "appname4", "host4", 127, null,
YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, "NON-MAPREDUCE",
- null);
+ LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport4);
ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9);
@@ -175,7 +179,7 @@ public void testGetApplications() throws Exception {
"user5", "queue5", "appname5", "host5", 128, null,
YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5,
FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE",
- null);
+ LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport5);
ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10);
@@ -184,7 +188,7 @@ public void testGetApplications() throws Exception {
"user6", "queue6", "appname6", "host6", 129, null,
YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6,
FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG",
- null);
+ LogAggregationState.IN_PROGRESS, null);
applicationReports.add(newApplicationReport6);
// Test command yarn application -list
@@ -546,7 +550,8 @@ public void testKillApplication() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.COMPLETED, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport2);
int result = cli.run(new String[] { "-kill", applicationId.toString() });
@@ -559,7 +564,8 @@ public void testKillApplication() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_START, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
result = cli.run(new String[] { "-kill", applicationId.toString() });
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
index 9716f74..b65e18f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
@@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.TextFormat;
@@ -516,4 +518,34 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+
+ @Override
+ public LogAggregationState getLogAggregationState() {
+ ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if(!p.hasLogAggregationState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationState());
+ }
+
+ private LogAggregationState convertFromProtoFormat(
+ LogAggregationStateProto p) {
+ return ProtoUtils.convertFromProtoFormat(p);
+ }
+
+ @Override
+ public void setLogAggregationState(
+ LogAggregationState logAggregationState) {
+ maybeInitBuilder();
+ if (logAggregationState == null) {
+ builder.clearLogAggregationState();
+ return;
+ }
+ builder.setLogAggregationState(convertToProtoFormat(logAggregationState));
+ }
+
+ private LogAggregationStateProto convertToProtoFormat(
+ LogAggregationState logAggregationState) {
+ return ProtoUtils.convertToProtoFormat(logAggregationState);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index b660f7d..6a9cb34 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -41,6 +42,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
@@ -206,4 +208,15 @@ public static ApplicationAccessType convertFromProtoFormat(
return ApplicationAccessType.valueOf(e.name().replace(
APP_ACCESS_TYPE_PREFIX, ""));
}
+
+ /*
+ * LogAggregationState
+ */
+ private static String LOG_AGGREGATION_PREFIX = "LOG_AGGREGATION_";
+ public static LogAggregationStateProto convertToProtoFormat(LogAggregationState e) {
+ return LogAggregationStateProto.valueOf(LOG_AGGREGATION_PREFIX + e.name());
+ }
+ public static LogAggregationState convertFromProtoFormat(LogAggregationStateProto e) {
+ return LogAggregationState.valueOf(e.name().replace(LOG_AGGREGATION_PREFIX, ""));
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
index 9302d4b..685bf45 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
@@ -56,9 +57,10 @@ protected static ApplicationReport createApplicationReport(
ApplicationAttemptId.newInstance(appId, appAttemptIdInt);
ApplicationReport appReport =
ApplicationReport.newInstance(appId, appAttemptId, "user", "queue",
- "appname", "host", 124, null, YarnApplicationState.FINISHED,
- "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", "host", 124, null, YarnApplicationState.FINISHED,
+ "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
+ "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLE, null);
return appReport;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java
new file mode 100644
index 0000000..cd7cd92
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerLogAggregationStatus.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * ContainerLogAggregationStatus is a data structure of the
+ * log aggregation state for the Container.
+ *
+ * It includes information such as:
+ *
+ * - ContainerId
+ * - log aggregation state for this container
+ *
+ *
+ *
+ * @see NodeStatus
+ */
+
+public abstract class ContainerLogAggregationStatus {
+
+ @Private
+ public static ContainerLogAggregationStatus newInstance(
+ ContainerId containerId, LogAggregationState logAggregationState) {
+ ContainerLogAggregationStatus status =
+ Records.newRecord(ContainerLogAggregationStatus.class);
+ status.setContainerId(containerId);
+ status.setLogAggregationState(logAggregationState);
+ return status;
+ }
+
+ @Public
+ @Stable
+ public abstract ContainerId getContainerId();
+
+ @Private
+ @Unstable
+ public abstract void setContainerId(ContainerId containerId);
+
+ @Public
+ @Stable
+ public abstract LogAggregationState getLogAggregationState();
+
+ @Private
+ @Unstable
+ public abstract void setLogAggregationState(
+ LogAggregationState logAggregationState);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 8e98703..0f9be73 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -41,4 +41,9 @@ public abstract void setContainersStatuses(
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
+
+ public abstract List
+ getContainerLogAggregationStatus();
+ public abstract void setContainerLogAggregationStatus(
+ List containerLogAggregationStatus);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java
new file mode 100644
index 0000000..a1b0ae8
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerLogAggregationStatusPBImpl.java
@@ -0,0 +1,127 @@
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
+import com.google.protobuf.TextFormat;
+
+
+public class ContainerLogAggregationStatusPBImpl extends ContainerLogAggregationStatus{
+ ContainerLogAggregationStatusProto proto = ContainerLogAggregationStatusProto.getDefaultInstance();
+ ContainerLogAggregationStatusProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId = null;
+
+ public ContainerLogAggregationStatusPBImpl() {
+ builder = ContainerLogAggregationStatusProto.newBuilder();
+ }
+
+ public ContainerLogAggregationStatusPBImpl (ContainerLogAggregationStatusProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ContainerLogAggregationStatusProto getProto() {
+ mergeLocalToProto();
+ this.proto = this.viaProto ? this.proto : this.builder.build();
+ this.viaProto = true;
+ return this.proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToProto() {
+ if (this.viaProto)
+ maybeInitBuilder();
+ this.proto = this.builder.build();
+
+ this.viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (this.viaProto || this.builder == null) {
+ this.builder = ContainerLogAggregationStatusProto.newBuilder(this.proto);
+ }
+ this.viaProto = false;
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ ContainerLogAggregationStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+
+ return this.containerId;
+ }
+
+ private ContainerId convertFromProtoFormat(ContainerIdProto proto) {
+ return new ContainerIdPBImpl(proto);
+ }
+
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null)
+ builder.clearContainerId();
+ this.containerId = containerId;
+ }
+
+ @Override
+ public LogAggregationState getLogAggregationState() {
+ ContainerLogAggregationStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if(!p.hasLogAggregationState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationState());
+ }
+
+ private LogAggregationState convertFromProtoFormat(
+ LogAggregationStateProto p) {
+ return ProtoUtils.convertFromProtoFormat(p);
+ }
+
+ @Override
+ public void setLogAggregationState(LogAggregationState logAggregationState) {
+ maybeInitBuilder();
+ if (logAggregationState == null) {
+ builder.clearLogAggregationState();
+ return;
+ }
+ builder.setLogAggregationState(convertToProtoFormat(logAggregationState));
+ }
+
+ private LogAggregationStateProto convertToProtoFormat(
+ LogAggregationState logAggregationState) {
+ return ProtoUtils.convertToProtoFormat(logAggregationState);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8ed7849..e472379 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,9 +33,11 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerLogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -50,6 +52,7 @@
private List containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List keepAliveApplications = null;
+ private List logAggregationStatus = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@@ -310,4 +313,38 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}
+
+ @Override
+ public List getContainerLogAggregationStatus() {
+ initContainerLogAggregationStatus();
+ return this.logAggregationStatus;
+ }
+
+ @Override
+ public void setContainerLogAggregationStatus(
+ List containerLogAggregationStatus) {
+ if (containerLogAggregationStatus == null) {
+ builder.clearContainerLogAggregationStatus();
+ }
+ this.logAggregationStatus = containerLogAggregationStatus;
+ }
+
+ private synchronized void initContainerLogAggregationStatus() {
+ if (this.logAggregationStatus != null) {
+ return;
+ }
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List list =
+ p.getContainerLogAggregationStatusList();
+ this.logAggregationStatus = new ArrayList();
+
+ for (ContainerLogAggregationStatusProto c : list) {
+ this.logAggregationStatus.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private ContainerLogAggregationStatusPBImpl convertFromProtoFormat(
+ ContainerLogAggregationStatusProto c) {
+ return new ContainerLogAggregationStatusPBImpl(c);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 1b2a03e..00d9488 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -312,7 +313,8 @@ public static ApplicationReport newApplicationReport(
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String appType, Token amRmToken) {
+ float progress, String appType, LogAggregationState logAggregationState,
+ Token amRmToken) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@@ -333,6 +335,7 @@ public static ApplicationReport newApplicationReport(
report.setOriginalTrackingUrl(origTrackingUrl);
report.setProgress(progress);
report.setApplicationType(appType);
+ report.setLogAggregationState(logAggregationState);
report.setAMRMToken(amRmToken);
return report;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 4f5d168..a62b2af 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -30,12 +30,18 @@ enum NodeActionProto {
SHUTDOWN = 2;
}
+message ContainerLogAggregationStatusProto {
+ optional ContainerIdProto container_id = 1;
+ optional LogAggregationStateProto logAggregationState = 2;
+}
+
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
+ repeated ContainerLogAggregationStatusProto containerLogAggregationStatus = 6;
}
message MasterKeyProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 7995fb3..42c2f71 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -337,7 +337,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
+ remoteNodeStatus.getContainerLogAggregationStatus()));
return nodeHeartBeatResponse;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fadaa3b..8a42302 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -28,8 +28,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -211,4 +213,16 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
+
+ /**
+ * Update the logAggregation state for the container that launched for this RMApp
+ * @param status the ContainerLogAggregationStatus
+ */
+ void updateLogAggregationStatus(ContainerLogAggregationStatus status);
+
+ /**
+ * Return the log aggregation State
+ * @return the log aggregation State of the RMApp
+ */
+ LogAggregationState getLogAggregationState();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index a8a0af4..6a40ba4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -21,10 +21,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -40,7 +42,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -50,14 +54,13 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -99,7 +102,11 @@
= new LinkedHashMap();
private final long submitTime;
private final Set updatedNodes = new HashSet();
+ private final Map containerLogAggregationStatus =
+ new HashMap();
private final String applicationType;
+ private final AtomicInteger LogAggregationCompleted = new AtomicInteger(0);
+ private final AtomicInteger LogAggregationFailed = new AtomicInteger(0);
// Mutable fields
private long startTime;
@@ -492,7 +499,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
this.name, host, rpcPort, clientToAMToken,
createApplicationState(), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
- appUsageReport, origTrackingUrl, progress, this.applicationType,
+ appUsageReport, origTrackingUrl, progress, this.applicationType,
+ getLogAggregationState(),
amrmToken);
} finally {
this.readLock.unlock();
@@ -715,6 +723,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
super.transition(app, event);
}
}
+
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -736,6 +745,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
@SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
+ if (app.getCurrentAppAttempt() != null) {
+ app.initialContainerLogAggregationStatus(app.getCurrentAppAttempt()
+ .getAllContainers());
+ }
Set nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {
app.handler.handle(
@@ -847,4 +860,63 @@ private void removeApplicationState(){
isAppRemovalRequestSent = true;
}
}
+
+ @Override
+ public void updateLogAggregationStatus(ContainerLogAggregationStatus status) {
+ this.writeLock.lock();
+ try {
+ if (containerLogAggregationStatus.containsKey(status.getContainerId())) {
+ LogAggregationState currentState =
+ containerLogAggregationStatus.get(status.getContainerId());
+ if (currentState != LogAggregationState.COMPLETED
+ && currentState != LogAggregationState.FAILED) {
+ if (status.getLogAggregationState() == LogAggregationState.COMPLETED) {
+ LogAggregationCompleted.getAndAdd(1);
+ } else if (status.getLogAggregationState() == LogAggregationState.FAILED) {
+ LogAggregationFailed.getAndAdd(1);
+ }
+ containerLogAggregationStatus.put(status.getContainerId(),
+ status.getLogAggregationState());
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void
+ initialContainerLogAggregationStatus(Set containers) {
+ for (ContainerId container : containers) {
+ containerLogAggregationStatus.put(container,
+ LogAggregationState.NOT_START);
+ }
+ }
+
+ @Override
+ public LogAggregationState getLogAggregationState() {
+ this.readLock.lock();
+ try {
+ if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
+ return LogAggregationState.DISABLE;
+ }
+ RMAppState currentState = getState();
+ if (currentState != RMAppState.FINISHED
+ && currentState != RMAppState.KILLED
+ && currentState != RMAppState.FAILED) {
+ return LogAggregationState.NOT_START;
+ }
+ if (LogAggregationFailed.get() > 0
+ || containerLogAggregationStatus.size() == 0) {
+ return LogAggregationState.FAILED;
+ } else if (LogAggregationCompleted.get() == containerLogAggregationStatus
+ .size()) {
+ return LogAggregationState.COMPLETED;
+ } else {
+ return LogAggregationState.IN_PROGRESS;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 335dbda..64f3b76 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -178,4 +179,10 @@
* @return the start time of the application.
*/
long getStartTime();
+
+ /**
+ * get All the containers which are launched for this RMAppAttempt
+ * @return set of containerIds
+ */
+ Set getAllContainers();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index f68a4a5..2823b49 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -137,6 +137,7 @@
new HashSet();
private final List justFinishedContainers =
new ArrayList();
+ private final Set containers = new HashSet();
private Container masterContainer;
private float progress = 0;
@@ -1204,6 +1205,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
// Put it in completedcontainers list
appAttempt.justFinishedContainers.add(containerStatus);
+ appAttempt.containers.add(containerStatus.getContainerId());
return RMAppAttemptState.RUNNING;
}
}
@@ -1231,6 +1233,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
// Normal container.
appAttempt.justFinishedContainers.add(containerStatus);
+ appAttempt.containers.add(containerStatus.getContainerId());
return RMAppAttemptState.FINISHING;
}
}
@@ -1293,4 +1296,17 @@ private void removeCredentials(RMAppAttemptImpl appAttempt) {
private static String sanitizeTrackingUrl(String url) {
return (url == null || url.trim().isEmpty()) ? "N/A" : url;
}
+
+ @Override
+ public Set getAllContainers() {
+ this.readLock.lock();
+ try {
+ if (this.masterContainer != null) {
+ this.containers.add(this.masterContainer.getId());
+ }
+ return this.containers;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7429100..2a062f2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -47,11 +47,13 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -637,6 +639,11 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
statusEvent.getKeepAliveAppIds());
}
+ if (statusEvent.getContainerLogAggregationStatus() != null
+ && !statusEvent.getContainerLogAggregationStatus().isEmpty()) {
+ rmNode.sendContainerLogAggregationStatus(statusEvent
+ .getContainerLogAggregationStatus());
+ }
return NodeState.RUNNING;
}
}
@@ -683,6 +690,19 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
return latestContainerInfoList;
}
+ private void sendContainerLogAggregationStatus(
+ List containerLogAggregationStatus) {
+ for(ContainerLogAggregationStatus status : containerLogAggregationStatus) {
+ RMApp app =
+ context.getRMApps().get(
+ status.getContainerId().getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ app.updateLogAggregationStatus(status);
+ }
+ }
+ }
+
@VisibleForTesting
public void setNextHeartBeat(boolean nextHeartBeat) {
this.nextHeartBeat = nextHeartBeat;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index abfacbb..78cc4dd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
public class RMNodeStatusEvent extends RMNodeEvent {
@@ -32,15 +33,18 @@
private final List containersCollection;
private final NodeHeartbeatResponse latestResponse;
private final List keepAliveAppIds;
+ private final List logAggregationStatus;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List collection, List keepAliveAppIds,
- NodeHeartbeatResponse latestResponse) {
+ NodeHeartbeatResponse latestResponse,
+ List logAggregationStatus) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
+ this.logAggregationStatus = logAggregationStatus;
}
public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +62,8 @@ public NodeHeartbeatResponse getLatestResponse() {
public List getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
+
+ public List getContainerLogAggregationStatus() {
+ return this.logAggregationStatus;
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java
new file mode 100644
index 0000000..bee42b5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java
@@ -0,0 +1,449 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ContainerLogAggregationStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.TestRMAppTransitions;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestLogAggregationStatus {
+ static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
+
+ private boolean isSecurityEnabled;
+ private Configuration conf;
+ private RMContext rmContext;
+ private static int maxAppAttempts =
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
+ private static int container_id = 1;
+ private ApplicationId applicationId = MockApps.newAppID(1);
+ private DrainDispatcher rmDispatcher;
+ private RMStateStore store;
+ private YarnScheduler scheduler;
+ private final int max_wait_time = 20;
+ private Container amContainer;
+
+ // ignore all the RM application attempt events
+ private static final class TestApplicationAttemptEventDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+ public TestApplicationAttemptEventDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppAttemptEvent event) {
+ ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = this.rmContext.getRMApps().get(appId);
+ if (rmApp != null) {
+ try {
+ rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appId, t);
+ }
+ }
+ }
+ }
+
+ // handle all the RM application events - same as in ResourceManager.java
+ private static final class TestApplicationEventDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+ public TestApplicationEventDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppEvent event) {
+ ApplicationId appID = event.getApplicationId();
+ RMApp rmApp = this.rmContext.getRMApps().get(appID);
+ if (rmApp != null) {
+ try {
+ rmApp.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appID, t);
+ }
+ }
+ }
+ }
+
+ // handle all the RM application manager events - same as in
+ // ResourceManager.java
+ private static final class TestApplicationManagerEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(RMAppManagerEvent event) {
+ }
+ }
+
+ // handle all the scheduler events - same as in ResourceManager.java
+ private static final class TestSchedulerEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(SchedulerEvent event) {
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
+ if (isSecurityEnabled) {
+ authMethod = AuthenticationMethod.KERBEROS;
+ }
+ SecurityUtil.setAuthenticationMethod(authMethod, conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ rmDispatcher = new DrainDispatcher();
+ ContainerAllocationExpirer containerAllocationExpirer =
+ mock(ContainerAllocationExpirer.class);
+ AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
+ AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
+ store = mock(RMStateStore.class);
+ this.rmContext =
+ new RMContextImpl(rmDispatcher, store,
+ containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+ null, new AMRMTokenSecretManager(conf),
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM());
+
+ rmDispatcher.register(RMAppAttemptEventType.class,
+ new TestApplicationAttemptEventDispatcher(this.rmContext));
+
+ rmDispatcher.register(RMAppEventType.class,
+ new TestApplicationEventDispatcher(rmContext));
+
+ rmDispatcher.register(RMAppManagerEventType.class,
+ new TestApplicationManagerEventDispatcher());
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+ ApplicationMasterLauncher mockAMLauncher = mock(ApplicationMasterLauncher.class);
+ doAnswer(new Answer