> table =
div.h3(
"Log Aggregation: "
- + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+ + (rmApp == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport().name())).table(
"#LogAggregationStatus");
table.
tr().
th(_TH, "NodeId").
th(_TH, "Log Aggregation Status").
- th(_TH, "Diagnostis Message").
+ th(_TH, "Last 10 Diagnostis Messages").
+ th(_TH, "Last 10 Failure Messages").
_();
- RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
if (rmApp != null) {
Map
logAggregationReports =
rmApp.getLogAggregationReportsForApp();
@@ -136,10 +131,14 @@ protected void render(Block html) {
String message =
report.getValue() == null ? null : report.getValue()
.getDiagnosticMessage();
+ String failureMessage =
+ report.getValue() == null ? null : report.getValue()
+ .getFailureMessages();
table.tr()
.td(report.getKey().toString())
.td(status == null ? "N/A" : status.toString())
- .td(message == null ? "N/A" : message)._();
+ .td(message == null ? "N/A" : message)
+ .td(failureMessage == null ? "N/A" : failureMessage)._();
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 4eec63f..40fa80d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -23,7 +23,7 @@
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -155,26 +155,26 @@ public void testLogAggregationStatus() throws Exception {
.getLogAggregationStatus());
}
- Map node1ReportForApp =
- new HashMap();
+ List node1ReportForApp =
+ new ArrayList();
String messageForNode1_1 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.RUNNING, messageForNode1_1);
- node1ReportForApp.put(appId, report1);
+ LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
+ messageForNode1_1, "");
+ node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp));
- Map node2ReportForApp =
- new HashMap();
+ List node2ReportForApp =
+ new ArrayList();
String messageForNode2_1 =
"node2 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report2 =
- LogAggregationReport.newInstance(appId, nodeId2,
- LogAggregationStatus.RUNNING, messageForNode2_1);
- node2ReportForApp.put(appId, report2);
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING, messageForNode2_1, "");
+ node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node2ReportForApp));
@@ -190,12 +190,12 @@ public void testLogAggregationStatus() throws Exception {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1, report.getValue()
+ Assert.assertEquals(messageForNode1_1 + "\n", report.getValue()
.getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode2_1, report.getValue()
+ Assert.assertEquals(messageForNode2_1 + "\n", report.getValue()
.getDiagnosticMessage());
} else {
// should not contain log aggregation report for other nodes
@@ -205,14 +205,14 @@ public void testLogAggregationStatus() throws Exception {
}
// node1 updates its log aggregation status again
- Map node1ReportForApp2 =
- new HashMap();
+ List node1ReportForApp2 =
+ new ArrayList();
String messageForNode1_2 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1_2 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.RUNNING, messageForNode1_2);
- node1ReportForApp2.put(appId, report1_2);
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING, messageForNode1_2, "");
+ node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp2));
@@ -230,12 +230,13 @@ public void testLogAggregationStatus() throws Exception {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
- .getValue().getDiagnosticMessage());
+ Assert.assertEquals(
+ messageForNode1_1 + "\n" + messageForNode1_2 + "\n", report
+ .getValue().getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode2_1, report.getValue()
+ Assert.assertEquals(messageForNode2_1 + "\n", report.getValue()
.getDiagnosticMessage());
} else {
// should not contain log aggregation report for other nodes
@@ -268,15 +269,22 @@ public void testLogAggregationStatus() throws Exception {
// Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to SUCCEEDED
- Map node1ReportForApp3 =
- new HashMap();
+ List node1ReportForApp3 =
+ new ArrayList();
String messageForNode1_3 =
"node1 final logAggregation status updated at "
+ System.currentTimeMillis();
- LogAggregationReport report1_3 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.SUCCEEDED, messageForNode1_3);
- node1ReportForApp3.put(appId, report1_3);
+ LogAggregationReport report1_3;
+ for (int i = 1; i < 10 ; i ++) {
+ report1_3 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING, "test_message_" + i, "");
+ node1ReportForApp3.add(report1_3);
+ }
+ node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.SUCCEEDED, messageForNode1_3, ""));
+ // For every logAggregationReport cached in memory, we can only save at most
+ // 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp3));
@@ -290,8 +298,15 @@ public void testLogAggregationStatus() throws Exception {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2
- + messageForNode1_3, report.getValue().getDiagnosticMessage());
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i < 10; i ++) {
+ builder.append("test_message_" + i);
+ builder.append("\n");
+ }
+ builder.append(messageForNode1_3);
+ builder.append("\n");
+ Assert.assertEquals(builder.toString(), report.getValue()
+ .getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus());
@@ -301,6 +316,28 @@ public void testLogAggregationStatus() throws Exception {
.fail("should not contain log aggregation report for other nodes");
}
}
+
+ // update log aggregationStatus for node2 as FAILED,
+ // so the log aggregation status for the App will become FAILED,
+ // and we only keep the log aggregation reports whose status is FAILED,
+ // so the log aggregation report for node1 will be removed.
+ List node2ReportForApp2 =
+ new ArrayList();
+ LogAggregationReport report2_2 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.FAILED, "", "Fail_Message");
+ node2ReportForApp2.add(report2_2);
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node2ReportForApp2));
+ Assert.assertEquals(LogAggregationStatus.FAILED,
+ rmApp.getLogAggregationStatusForAppReport());
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertTrue(logAggregationStatus.size() == 1);
+ Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
+ Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
+ Assert.assertEquals("Fail_Message" + "\n",
+ logAggregationStatus.get(node2.getNodeID()).getFailureMessages());
}
@Test (timeout = 10000)
@@ -317,9 +354,11 @@ public void testGetLogAggregationStatusForAppReport() {
// Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf);
- // If we do not know any NodeManagers for this application ,
- // the log aggregation status will return null
- Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
+ // If we do not know any NodeManagers for this application , and
+ // the log aggregation is enabled, the log aggregation status will
+ // return NOT_START
+ Assert.assertEquals(LogAggregationStatus.NOT_START,
+ rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@@ -329,24 +368,32 @@ public void testGetLogAggregationStatusForAppReport() {
// If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING,
+ "", ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
@@ -357,13 +404,17 @@ public void testGetLogAggregationStatusForAppReport() {
// others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT,
+ "", ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport());
@@ -371,17 +422,38 @@ public void testGetLogAggregationStatusForAppReport() {
// is at the final state, the log aggregation status for this app will
// return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "",""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
+ // If the log aggregation status for at least one of NMs are RUNNING,
+ // the log aggregation status for this app will return RUNNING
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING,
+ "", ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START,
+ "", ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING,
+ rmApp.getLogAggregationStatusForAppReport());
+
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED,
@@ -389,13 +461,17 @@ public void testGetLogAggregationStatusForAppReport() {
// at the final state, the log aggregation status for this app
// will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT,
+ "", ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.FAILED,
+ "", ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED,
+ "", ""));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());