diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
index 03552e4..fc825cf 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -90,7 +91,7 @@ private ApplicationReport getUnknownApplicationReport() {
return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE, LogAggregationState.DISABLED, null);
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
index 7b47711..abbb92e 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -470,9 +471,10 @@ private ApplicationReport getFinishedApplicationReport() {
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
- "appname", "host", 124, null, YarnApplicationState.FINISHED,
- "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", "host", 124, null, YarnApplicationState.FINISHED,
+ "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
+ "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLED, null);
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -480,9 +482,11 @@ private ApplicationReport getRunningApplicationReport(String host, int port) {
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
- "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
- "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", host, port, null, YarnApplicationState.RUNNING,
+ "diagnostics",
+ "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLED, null);
}
private ResourceMgrDelegate getRMDelegate() throws IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
index 0854fdb..d49a3ef 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java
@@ -40,6 +40,7 @@
*
{@link YarnApplicationState} of the application.
* Diagnostic information in case of errors.
* Start time of the application.
+ * {@link LogAggregationState} of the application.
* Client {@link Token} of the application (if security is enabled).
*
*
@@ -58,7 +59,8 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
YarnApplicationState state, String diagnostics, String url,
long startTime, long finishTime, FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String applicationType, Token amRmToken) {
+ float progress, String applicationType,
+ LogAggregationState logAggregationState, Token amRmToken) {
ApplicationReport report = Records.newRecord(ApplicationReport.class);
report.setApplicationId(applicationId);
report.setCurrentApplicationAttemptId(applicationAttemptId);
@@ -78,6 +80,7 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
report.setOriginalTrackingUrl(origTrackingUrl);
report.setProgress(progress);
report.setApplicationType(applicationType);
+ report.setLogAggregationState(logAggregationState);
report.setAMRMToken(amRmToken);
return report;
}
@@ -349,4 +352,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
@Stable
public abstract Token getAMRMToken();
+ /**
+ * Get the application's log aggregation state
+ * @return application's log aggregation state
+ */
+ @Public
+ @Stable
+ public abstract LogAggregationState getLogAggregationState();
+
+ @Private
+ @Stable
+ public abstract void setLogAggregationState(
+ LogAggregationState logAggregationState);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java
new file mode 100644
index 0000000..5a01938
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationState.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+
+/**
+ * Enumeration of various states of log aggregation.
+ */
+
+@Public
+@Stable
+public enum LogAggregationState {
+ /**The log aggregation is disabled. */
+ DISABLED,
+
+ /** The log aggregation is not started. */
+ NOT_STARTED,
+
+ /** The log aggregation is in progress. */
+ IN_PROGRESS,
+
+ /** The log aggregation is failed. */
+ FAILED,
+
+ /** The log aggregation is finished. */
+ COMPLETED,
+
+ /** The time which waits for log aggregation to finish
+ * exceeds the maximum wait time.
+ */
+ TIME_OUT
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 09f6b6e..fa9fde5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -495,13 +495,20 @@
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
- /**
+ /**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
*/
public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX
+ "log-aggregation.retain-seconds";
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
+
+ /**
+ * How long to wait for log aggregation to finish, -1 disables.
+ */
+ public static final String LOG_AGGREGATION_WATTING_MS = YARN_PREFIX
+ + "log-aggregation.waitting-ms";
+ public static final long DEFAULT_LOG_AGGREGATION_WATTING_MS = 10 * 60 * 1000;
/**
* How long to wait between aggregated log retention checks. If set to
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 9d4d59e..689ac59 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -156,7 +156,8 @@ message ApplicationReportProto {
optional ApplicationAttemptIdProto currentApplicationAttemptId = 16;
optional float progress = 17;
optional string applicationType = 18;
- optional hadoop.common.TokenProto am_rm_token = 19;
+ optional LogAggregationStateProto logAggregationState = 19;
+ optional hadoop.common.TokenProto am_rm_token = 20;
}
enum NodeStateProto {
@@ -185,7 +186,14 @@ message NodeReportProto {
optional int64 last_health_report_time = 9;
}
-
+enum LogAggregationStateProto {
+ LOG_AGGREGATION_DISABLED = 0;
+ LOG_AGGREGATION_NOT_STARTED = 1;
+ LOG_AGGREGATION_IN_PROGRESS = 2;
+ LOG_AGGREGATION_FAILED = 3;
+ LOG_AGGREGATION_COMPLETED = 4;
+ LOG_AGGREGATION_TIME_OUT = 5;
+}
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index a7b7d65..37c5f3e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -278,6 +278,8 @@ private void printApplicationReport(String applicationId)
appReportStr.println(appReport.getYarnApplicationState());
appReportStr.print("\tFinal-State : ");
appReportStr.println(appReport.getFinalApplicationStatus());
+ appReportStr.print("\tLOG-AGGREGATION-State : ");
+ appReportStr.println(appReport.getLogAggregationState());
appReportStr.print("\tTracking-URL : ");
appReportStr.println(appReport.getOriginalTrackingUrl());
appReportStr.print("\tRPC Port : ");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 826433d..fdfd3fc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -271,7 +272,8 @@ public void setYarnApplicationState(YarnApplicationState state) {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_STARTED, null);
List applicationReports =
new ArrayList();
applicationReports.add(newApplicationReport);
@@ -282,7 +284,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user2", "queue2", "appname2", "host2", 125, null,
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
- null);
+ LogAggregationState.COMPLETED, null);
applicationReports.add(newApplicationReport2);
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
@@ -291,7 +293,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user3", "queue3", "appname3", "host3", 126, null,
YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
- null);
+ LogAggregationState.NOT_STARTED, null);
applicationReports.add(newApplicationReport3);
ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8);
@@ -302,7 +304,7 @@ public void setYarnApplicationState(YarnApplicationState state) {
"user4", "queue4", "appname4", "host4", 127, null,
YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f,
- "NON-MAPREDUCE", null);
+ "NON-MAPREDUCE", LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport4);
return applicationReports;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index 1d08f24..1cf857a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -84,7 +85,8 @@ public void testGetApplicationReport() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.DISABLED, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
int result = cli.run(new String[] { "-status", applicationId.toString() });
@@ -103,6 +105,7 @@ public void testGetApplicationReport() throws Exception {
pw.println("\tProgress : 53.79%");
pw.println("\tState : FINISHED");
pw.println("\tFinal-State : SUCCEEDED");
+ pw.println("\tLOG-AGGREGATION-State : DISABLED");
pw.println("\tTracking-URL : N/A");
pw.println("\tRPC Port : 124");
pw.println("\tAM Host : host");
@@ -138,7 +141,8 @@ public void testGetApplications() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_STARTED, null);
List applicationReports = new ArrayList();
applicationReports.add(newApplicationReport);
@@ -147,8 +151,8 @@ public void testGetApplications() throws Exception {
applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2),
"user2", "queue2", "appname2", "host2", 125, null,
YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
- null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN",
+ LogAggregationState.COMPLETED, null);
applicationReports.add(newApplicationReport2);
ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7);
@@ -157,7 +161,7 @@ public void testGetApplications() throws Exception {
"user3", "queue3", "appname3", "host3", 126, null,
YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE",
- null);
+ LogAggregationState.NOT_STARTED, null);
applicationReports.add(newApplicationReport3);
ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8);
@@ -166,7 +170,7 @@ public void testGetApplications() throws Exception {
"user4", "queue4", "appname4", "host4", 127, null,
YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, "NON-MAPREDUCE",
- null);
+ LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport4);
ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9);
@@ -175,7 +179,7 @@ public void testGetApplications() throws Exception {
"user5", "queue5", "appname5", "host5", 128, null,
YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5,
FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE",
- null);
+ LogAggregationState.FAILED, null);
applicationReports.add(newApplicationReport5);
ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10);
@@ -184,7 +188,7 @@ public void testGetApplications() throws Exception {
"user6", "queue6", "appname6", "host6", 129, null,
YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6,
FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG",
- null);
+ LogAggregationState.IN_PROGRESS, null);
applicationReports.add(newApplicationReport6);
// Test command yarn application -list
@@ -546,7 +550,8 @@ public void testKillApplication() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.COMPLETED, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport2);
int result = cli.run(new String[] { "-kill", applicationId.toString() });
@@ -559,7 +564,8 @@ public void testKillApplication() throws Exception {
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
- FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
+ FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN",
+ LogAggregationState.NOT_STARTED, null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
result = cli.run(new String[] { "-kill", applicationId.toString() });
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
index 9716f74..b65e18f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
@@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.TextFormat;
@@ -516,4 +518,34 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
+
+ @Override
+ public LogAggregationState getLogAggregationState() {
+ ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if(!p.hasLogAggregationState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationState());
+ }
+
+ private LogAggregationState convertFromProtoFormat(
+ LogAggregationStateProto p) {
+ return ProtoUtils.convertFromProtoFormat(p);
+ }
+
+ @Override
+ public void setLogAggregationState(
+ LogAggregationState logAggregationState) {
+ maybeInitBuilder();
+ if (logAggregationState == null) {
+ builder.clearLogAggregationState();
+ return;
+ }
+ builder.setLogAggregationState(convertToProtoFormat(logAggregationState));
+ }
+
+ private LogAggregationStateProto convertToProtoFormat(
+ LogAggregationState logAggregationState) {
+ return ProtoUtils.convertToProtoFormat(logAggregationState);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index b660f7d..6a9cb34 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -41,6 +42,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
@@ -206,4 +208,15 @@ public static ApplicationAccessType convertFromProtoFormat(
return ApplicationAccessType.valueOf(e.name().replace(
APP_ACCESS_TYPE_PREFIX, ""));
}
+
+ /*
+ * LogAggregationState
+ */
+ private static String LOG_AGGREGATION_PREFIX = "LOG_AGGREGATION_";
+ public static LogAggregationStateProto convertToProtoFormat(LogAggregationState e) {
+ return LogAggregationStateProto.valueOf(LOG_AGGREGATION_PREFIX + e.name());
+ }
+ public static LogAggregationState convertFromProtoFormat(LogAggregationStateProto e) {
+ return LogAggregationState.valueOf(e.name().replace(LOG_AGGREGATION_PREFIX, ""));
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
index 9302d4b..b0e7aa4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
@@ -56,9 +57,10 @@ protected static ApplicationReport createApplicationReport(
ApplicationAttemptId.newInstance(appId, appAttemptIdInt);
ApplicationReport appReport =
ApplicationReport.newInstance(appId, appAttemptId, "user", "queue",
- "appname", "host", 124, null, YarnApplicationState.FINISHED,
- "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
+ "appname", "host", 124, null, YarnApplicationState.FINISHED,
+ "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
+ "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE,
+ LogAggregationState.DISABLED, null);
return appReport;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java
new file mode 100644
index 0000000..782696c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ApplicationLogAggregationStatus.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * ApplicationLogStatus is a data structure of the
+ * log aggregation state for the Application.
+ *
+ * It includes information such as:
+ *
+ * - ApplicationId
+ * - LogAggregation State of this application
+ * - Set of LogAggregationState for all containers
+ * - Diagnostic information
+ *
+ *
+ *
+ * @see NodeStatus
+ */
+
+public abstract class ApplicationLogAggregationStatus {
+ @Private
+ public static ApplicationLogAggregationStatus newInstance(ApplicationId appId,
+ LogAggregationState appLogState,
+ Map containersLogStatus, String diagnostic) {
+ ApplicationLogAggregationStatus appLogStatus =
+ Records.newRecord(ApplicationLogAggregationStatus.class);
+ appLogStatus.setApplicationId(appId);
+ appLogStatus.setLogAggregationState(appLogState);
+ appLogStatus.setContainersLogStatus(containersLogStatus);
+ appLogStatus.setDiagnostic(diagnostic);
+ return appLogStatus;
+ }
+
+ @Public
+ @Stable
+ public abstract ApplicationId getApplicationId();
+
+ @Private
+ @Unstable
+ public abstract void setApplicationId(ApplicationId applicationId);
+
+ @Public
+ @Stable
+ public abstract LogAggregationState getLogAggregationState();
+
+ @Private
+ @Unstable
+ public abstract void setLogAggregationState(
+ LogAggregationState logAggregationState);
+
+ @Public
+ @Stable
+ public abstract Map getContainersLogStatus();
+
+ @Private
+ @Unstable
+ public abstract void setContainersLogStatus(
+ Map containersLogStatus);
+
+ @Public
+ @Stable
+ public abstract String getDiagnostic();
+
+ @Private
+ @Unstable
+ public abstract void setDiagnostic(String diagnostic);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 8e98703..09e1059 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -41,4 +41,9 @@ public abstract void setContainersStatuses(
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
+
+ public abstract List
+ getApplicationLogAggregationStatus();
+ public abstract void setApplicationLogAggregationStatus(
+ List applicationLogAggregationStatus);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java
new file mode 100644
index 0000000..7800851
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ApplicationLogAggregationStatusPBImpl.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerIdLogAggregateStateMapProto;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
+
+import com.google.protobuf.TextFormat;
+
+public class ApplicationLogAggregationStatusPBImpl extends ApplicationLogAggregationStatus{
+
+ ApplicationLogAggregationStatusProto proto =
+ ApplicationLogAggregationStatusProto.getDefaultInstance();
+ ApplicationLogAggregationStatusProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId applicationId = null;
+ private Map containersLogStatus = null;
+
+ public ApplicationLogAggregationStatusPBImpl() {
+ builder = ApplicationLogAggregationStatusProto.newBuilder();
+ }
+
+ public ApplicationLogAggregationStatusPBImpl(ApplicationLogAggregationStatusProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized ApplicationLogAggregationStatusProto getProto() {
+ mergeLocalToProto();
+ this.proto = this.viaProto ? this.proto : this.builder.build();
+ this.viaProto = true;
+ return this.proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (this.viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ this.proto = this.builder.build();
+ this.viaProto = true;
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (this.viaProto || this.builder == null) {
+ this.builder = ApplicationLogAggregationStatusProto.newBuilder(this.proto);
+ }
+ this.viaProto = false;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.applicationId != null
+ && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+ builder.getApplicationId())) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+ if (this.containersLogStatus != null) {
+ addContainersLogStatusToProto();
+ }
+ }
+
+ private synchronized void addContainersLogStatusToProto() {
+ maybeInitBuilder();
+ builder.clearContainersLogStatus();
+ if (this.containersLogStatus == null) {
+ return;
+ }
+ Iterable extends ContainerIdLogAggregateStateMapProto> values =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator contaierIdIterator = containersLogStatus
+ .keySet().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return contaierIdIterator.hasNext();
+ }
+
+ @Override
+ public ContainerIdLogAggregateStateMapProto next() {
+ ContainerId key = contaierIdIterator.next();
+ return ContainerIdLogAggregateStateMapProto.newBuilder()
+ .setLogAggregationState(
+ convertToProtoFormat(containersLogStatus.get(key)))
+ .setContainerId(
+ convertToProtoFormat(key)).build();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ this.builder.addAllContainersLogStatus(values);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId containerId) {
+ return ((ContainerIdPBImpl) containerId).getProto();
+ }
+
+ private ContainerId convertFromProtoFormat(ContainerIdProto proto) {
+ return new ContainerIdPBImpl(proto);
+ }
+
+ @Override
+ public synchronized ApplicationId getApplicationId() {
+ ApplicationLogAggregationStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) {
+ return new ApplicationIdPBImpl(proto);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId appId) {
+ return ((ApplicationIdPBImpl)appId).getProto();
+ }
+
+ @Override
+ public synchronized void setApplicationId(ApplicationId applicationId) {
+ maybeInitBuilder();
+ if (applicationId == null) {
+ builder.clearApplicationId();
+ }
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public synchronized LogAggregationState getLogAggregationState() {
+ ApplicationLogAggregationStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasLogAggregationState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationState());
+ }
+
+ private LogAggregationState convertFromProtoFormat(
+ LogAggregationStateProto p) {
+ return ProtoUtils.convertFromProtoFormat(p);
+ }
+
+ @Override
+ public synchronized void setLogAggregationState(
+ LogAggregationState logAggregationState) {
+ maybeInitBuilder();
+ if (logAggregationState == null) {
+ builder.clearLogAggregationState();
+ return;
+ }
+ builder.setLogAggregationState(convertToProtoFormat(logAggregationState));
+ }
+
+ private LogAggregationStateProto convertToProtoFormat(
+ LogAggregationState logAggregationState) {
+ return ProtoUtils.convertToProtoFormat(logAggregationState);
+ }
+
+ @Override
+ public synchronized Map
+ getContainersLogStatus() {
+ initContainersLogStatus();
+ return this.containersLogStatus;
+ }
+
+ private synchronized void initContainersLogStatus() {
+ if (this.containersLogStatus != null) {
+ return;
+ }
+ ApplicationLogAggregationStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list =
+ p.getContainersLogStatusList();
+ this.containersLogStatus =
+ new HashMap(list.size());
+ for (ContainerIdLogAggregateStateMapProto pro : list) {
+ containersLogStatus.put(convertFromProtoFormat(pro.getContainerId()),
+ convertFromProtoFormat(pro.getLogAggregationState()));
+ }
+ }
+
+ @Override
+ public synchronized void setContainersLogStatus(
+ Map containersLogStatus) {
+ maybeInitBuilder();
+ if (this.containersLogStatus == null) {
+ builder.clearContainersLogStatus();
+ }
+ this.containersLogStatus = containersLogStatus;
+ }
+
+ @Override
+ public synchronized String getDiagnostic() {
+ ApplicationLogAggregationStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (! p.hasDiagnostic()) {
+ return null;
+ }
+ return p.getDiagnostic();
+ }
+
+ @Override
+ public synchronized void setDiagnostic(String diagnostic) {
+ maybeInitBuilder();
+ if (diagnostic == null) {
+ builder.clearDiagnostic();
+ return;
+ }
+ this.builder.setDiagnostic(diagnostic);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8ed7849..eb40989 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,9 +33,11 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationLogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -50,6 +52,7 @@
private List containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List keepAliveApplications = null;
+ private List applicationLogStatus = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@@ -80,6 +83,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
+ if (this.applicationLogStatus != null) {
+ addApplicationLogStatusToProto();
+ }
}
private synchronized void mergeLocalToProto() {
@@ -166,6 +172,42 @@ public void remove() {
builder.addAllKeepAliveApplications(iterable);
}
+ private synchronized void addApplicationLogStatusToProto() {
+ maybeInitBuilder();
+ builder.clearApplicationLogStatus();
+ if (applicationLogStatus == null)
+ {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter =
+ applicationLogStatus.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationLogAggregationStatusProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllApplicationLogStatus(iterable);
+ }
+
@Override
public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
@@ -255,7 +297,20 @@ private synchronized void initKeepAliveApplications() {
}
}
-
+
+ private synchronized void initApplicationLogStatus() {
+ if (this.applicationLogStatus != null) {
+ return;
+ }
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getApplicationLogStatusList();
+ this.applicationLogStatus = new ArrayList();
+
+ for (ApplicationLogAggregationStatusProto c : list) {
+ this.applicationLogStatus.add(convertFromProtoFormat(c));
+ }
+ }
+
@Override
public synchronized NodeHealthStatus getNodeHealthStatus() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
@@ -310,4 +365,31 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}
+
+ private ApplicationLogAggregationStatusProto convertToProtoFormat(
+ ApplicationLogAggregationStatus c) {
+ return ((ApplicationLogAggregationStatusPBImpl)c).getProto();
+ }
+
+ private ApplicationLogAggregationStatus convertFromProtoFormat(
+ ApplicationLogAggregationStatusProto c) {
+ return new ApplicationLogAggregationStatusPBImpl(c);
+ }
+
+ @Override
+ public synchronized List
+ getApplicationLogAggregationStatus() {
+ initApplicationLogStatus();
+ return this.applicationLogStatus;
+ }
+
+ @Override
+ public synchronized void setApplicationLogAggregationStatus(
+ List applicationLogStatus) {
+ maybeInitBuilder();
+ if (applicationLogStatus == null) {
+ builder.clearApplicationLogStatus();
+ }
+ this.applicationLogStatus = applicationLogStatus;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 1b2a03e..00d9488 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -312,7 +313,8 @@ public static ApplicationReport newApplicationReport(
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String appType, Token amRmToken) {
+ float progress, String appType, LogAggregationState logAggregationState,
+ Token amRmToken) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@@ -333,6 +335,7 @@ public static ApplicationReport newApplicationReport(
report.setOriginalTrackingUrl(origTrackingUrl);
report.setProgress(progress);
report.setApplicationType(appType);
+ report.setLogAggregationState(logAggregationState);
report.setAMRMToken(amRmToken);
return report;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 4f5d168..7d1dc0f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -30,12 +30,25 @@ enum NodeActionProto {
SHUTDOWN = 2;
}
+message ContainerIdLogAggregateStateMapProto {
+ optional ContainerIdProto container_id = 1;
+ optional LogAggregationStateProto logAggregationState = 2;
+}
+
+message ApplicationLogAggregationStatusProto {
+ optional ApplicationIdProto application_id = 1;
+ optional LogAggregationStateProto logAggregationState = 2;
+ repeated ContainerIdLogAggregateStateMapProto containersLogStatus = 3;
+ optional string diagnostic = 4;
+}
+
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
+ repeated ApplicationLogAggregationStatusProto applicationLogStatus = 6;
}
message MasterKeyProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 23f8754..e9016f8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -339,7 +339,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
+ remoteNodeStatus.getApplicationLogAggregationStatus()));
return nodeHeartBeatResponse;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fadaa3b..2efdb2d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -211,4 +212,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
+
+ /**
+ * Return the log aggregation State
+ * @return the log aggregation State of the RMApp
+ */
+ LogAggregationState getLogAggregationState();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index a2fa0e2..fba5e49 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -37,5 +37,8 @@
// Source: RMStateStore
APP_NEW_SAVED,
APP_UPDATE_SAVED,
- APP_REMOVED
+ APP_REMOVED,
+
+ // Source: RMNode
+ APP_LOG_AGGREGATION_STATUS_UPDATE
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index e3b083c..4e5f7ba 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -41,6 +42,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -50,6 +52,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -97,12 +100,18 @@
= new LinkedHashMap();
private final long submitTime;
private final Set updatedNodes = new HashSet();
+ private final Map appLogAggregationCompleted =
+ new HashMap();
+ private final Map appLogAggregationFailed =
+ new HashMap();
private final String applicationType;
+ private final long logAggregationTimeOut;
// Mutable fields
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
+ private int expectedNumOfAppLogStatus = 0;
private RMAppAttempt currentAttempt;
private String queue;
@SuppressWarnings("rawtypes")
@@ -234,6 +243,9 @@
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.KILL))
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE,
+ new AppLogAggregationStatusUpdateTransition())
// Transitions from FAILED state
// ignorable transitions
@@ -249,7 +261,9 @@
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
-
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE,
+ new AppLogAggregationStatusUpdateTransition())
.installTopology();
private final StateMachine
@@ -302,6 +316,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
+ long timeout = conf.getLong(
+ YarnConfiguration.LOG_AGGREGATION_WATTING_MS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_WATTING_MS);
+ if (timeout <= 0) {
+ LOG.info("The value of "
+ + YarnConfiguration.LOG_AGGREGATION_WATTING_MS
+ + " is less than 0. Using the default value : "
+ + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+ this.logAggregationTimeOut =
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS;
+ } else {
+ this.logAggregationTimeOut = timeout;
+ }
}
@Override
@@ -511,7 +538,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
this.name, host, rpcPort, clientToAMToken,
createApplicationState(), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
- appUsageReport, origTrackingUrl, progress, this.applicationType,
+ appUsageReport, origTrackingUrl, progress, this.applicationType,
+ getLogAggregationState(),
amrmToken);
} finally {
this.readLock.unlock();
@@ -889,6 +917,15 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
+ private static class AppLogAggregationStatusUpdateTransition extends RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppLogAggregationStatusUpdateEvent updateEvent =
+ (RMAppLogAggregationStatusUpdateEvent) event;
+ app.updateLogAggregationStatus(updateEvent.getApplicationLogStatus(),
+ updateEvent.getNodeId());
+ }
+ }
private static class AppKilledTransition extends FinalTransition {
@Override
@@ -911,6 +948,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
super.transition(app, event);
}
}
+
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -933,6 +971,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
@SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
Set nodes = getNodesOnWhichAttemptRan(app);
+ app.expectedNumOfAppLogStatus = nodes.size();
for (NodeId nodeId : nodes) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1017,4 +1056,50 @@ public YarnApplicationState createApplicationState() {
throw new YarnRuntimeException("Unknown state passed!");
}
}
+
+ private void updateLogAggregationStatus(ApplicationLogAggregationStatus status, NodeId nodeId) {
+ this.writeLock.lock();
+ try {
+ if (status.getLogAggregationState() == LogAggregationState.COMPLETED) {
+ appLogAggregationCompleted.put(nodeId, status);
+ } else if (status.getLogAggregationState() == LogAggregationState.FAILED) {
+ appLogAggregationFailed.put(nodeId, status);
+ }
+ if (status.getDiagnostic() != null && !status.getDiagnostic().isEmpty()) {
+ this.diagnostics.append(status.getDiagnostic() + "\n");
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
+ public LogAggregationState getLogAggregationState() {
+ this.readLock.lock();
+ try {
+ if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
+ return LogAggregationState.DISABLED;
+ }
+ RMAppState currentState = getState();
+ if (currentState != RMAppState.FINISHED
+ && currentState != RMAppState.KILLED
+ && currentState != RMAppState.FAILED) {
+ return LogAggregationState.NOT_STARTED;
+ }
+ if (appLogAggregationFailed.size() > 0 || expectedNumOfAppLogStatus == 0) {
+ return LogAggregationState.FAILED;
+ } else if (appLogAggregationCompleted.size() == expectedNumOfAppLogStatus) {
+ return LogAggregationState.COMPLETED;
+ } else {
+ if (System.currentTimeMillis() - this.finishTime
+ <= this.logAggregationTimeOut) {
+ return LogAggregationState.IN_PROGRESS;
+ }
+ return LogAggregationState.TIME_OUT;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java
new file mode 100644
index 0000000..135332b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusUpdateEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
+
+public class RMAppLogAggregationStatusUpdateEvent extends RMAppEvent {
+
+ private final NodeId nodeId;
+ private final ApplicationLogAggregationStatus applicationLogStatus;
+
+ public RMAppLogAggregationStatusUpdateEvent(ApplicationId appId,
+ NodeId nodeId, ApplicationLogAggregationStatus applicationLogStatus) {
+ super(appId, RMAppEventType.APP_LOG_AGGREGATION_STATUS_UPDATE);
+ this.nodeId = nodeId;
+ this.applicationLogStatus = applicationLogStatus;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ };
+
+ public ApplicationLogAggregationStatus getApplicationLogStatus() {
+ return this.applicationLogStatus;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 52bc285..d37e000 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -48,11 +48,14 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLogAggregationStatusUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -648,6 +651,11 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
statusEvent.getKeepAliveAppIds());
}
+ if (statusEvent.getApplicationLogAggregationStatus() != null
+ && !statusEvent.getApplicationLogAggregationStatus().isEmpty()) {
+ rmNode.sendApplicationLogStatus(statusEvent
+ .getApplicationLogAggregationStatus());
+ }
return NodeState.RUNNING;
}
}
@@ -694,6 +702,20 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
return latestContainerInfoList;
}
+ private void sendApplicationLogStatus(
+ List applicationLogStatus) {
+ for(ApplicationLogAggregationStatus status : applicationLogStatus) {
+ RMApp app =
+ context.getRMApps().get(
+ status.getApplicationId());
+ if (app != null) {
+ this.context.getDispatcher().getEventHandler().handle(
+ new RMAppLogAggregationStatusUpdateEvent(
+ status.getApplicationId(), this.nodeId, status));
+ }
+ }
+ }
+
@VisibleForTesting
public void setNextHeartBeat(boolean nextHeartBeat) {
this.nextHeartBeat = nextHeartBeat;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index abfacbb..c8234de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
public class RMNodeStatusEvent extends RMNodeEvent {
@@ -32,15 +33,18 @@
private final List containersCollection;
private final NodeHeartbeatResponse latestResponse;
private final List keepAliveAppIds;
+ private final List applicationLogStatus;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List collection, List keepAliveAppIds,
- NodeHeartbeatResponse latestResponse) {
+ NodeHeartbeatResponse latestResponse,
+ List applicationLogStatus) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
+ this.applicationLogStatus = applicationLogStatus;
}
public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +62,9 @@ public NodeHeartbeatResponse getLatestResponse() {
public List getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
+
+ public List
+ getApplicationLogAggregationStatus() {
+ return this.applicationLogStatus;
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java
new file mode 100644
index 0000000..9909620
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLogAggregationStatus.java
@@ -0,0 +1,514 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.TestRMAppTransitions;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestLogAggregationStatus {
+ static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
+
+ private boolean isSecurityEnabled;
+ private Configuration conf;
+ private RMContext rmContext;
+ private static int maxAppAttempts =
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
+ private static int container_id = 1;
+ private static int host_id = 1;
+ private List nodes = new ArrayList();
+ private ApplicationId applicationId = MockApps.newAppID(1);
+ private DrainDispatcher rmDispatcher;
+ private RMStateStore store;
+ private YarnScheduler scheduler;
+ private final int max_wait_time = 20;
+ private Container amContainer;
+
+ // handle all the RM application attempt events
+ private static final class TestApplicationAttemptEventDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+ public TestApplicationAttemptEventDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppAttemptEvent event) {
+ ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = this.rmContext.getRMApps().get(appId);
+ if (rmApp != null) {
+ try {
+ rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appId, t);
+ }
+ }
+ }
+ }
+
+ // handle all the RM application events - same as in ResourceManager.java
+ private static final class TestApplicationEventDispatcher implements
+ EventHandler {
+
+ private final RMContext rmContext;
+ public TestApplicationEventDispatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(RMAppEvent event) {
+ ApplicationId appID = event.getApplicationId();
+ RMApp rmApp = this.rmContext.getRMApps().get(appID);
+ if (rmApp != null) {
+ try {
+ rmApp.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appID, t);
+ }
+ }
+ }
+ }
+
+ // handle all the RM application manager events - same as in
+ // ResourceManager.java
+ private static final class TestApplicationManagerEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(RMAppManagerEvent event) {
+ }
+ }
+
+ // handle all the scheduler events - same as in ResourceManager.java
+ private static final class TestSchedulerEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(SchedulerEvent event) {
+ }
+ }
+
+ // handle all the RM Node events
+ private static final class TestRMNodeEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(RMNodeEvent event) {
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
+ if (isSecurityEnabled) {
+ authMethod = AuthenticationMethod.KERBEROS;
+ }
+ SecurityUtil.setAuthenticationMethod(authMethod, conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ rmDispatcher = new DrainDispatcher();
+ ContainerAllocationExpirer containerAllocationExpirer =
+ mock(ContainerAllocationExpirer.class);
+ AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
+ AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
+ store = mock(RMStateStore.class);
+ this.rmContext =
+ new RMContextImpl(rmDispatcher, store,
+ containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+ null, new AMRMTokenSecretManager(conf),
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM());
+
+ rmDispatcher.register(RMAppAttemptEventType.class,
+ new TestApplicationAttemptEventDispatcher(this.rmContext));
+
+ rmDispatcher.register(RMAppEventType.class,
+ new TestApplicationEventDispatcher(rmContext));
+
+ rmDispatcher.register(RMAppManagerEventType.class,
+ new TestApplicationManagerEventDispatcher());
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+ NodesListManager mockNodeListManager = mock(NodesListManager.class);
+ doAnswer(new Answer