diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java index b2270d8..5f62509 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java @@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; /** @@ -32,9 +31,9 @@ * It includes details such as: * * */ @@ -45,11 +44,13 @@ @Public @Unstable public static LogAggregationReport newInstance(ApplicationId appId, - NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) { + LogAggregationStatus status, String diagnosticMessage, + String failureMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setLogAggregationStatus(status); report.setDiagnosticMessage(diagnosticMessage); + report.setFailureMessages(failureMessage); return report; } @@ -66,18 +67,6 @@ public static LogAggregationReport newInstance(ApplicationId appId, public abstract void setApplicationId(ApplicationId appId); /** - * Get the NodeId. - * @return NodeId - */ - @Public - @Unstable - public abstract NodeId getNodeId(); - - @Public - @Unstable - public abstract void setNodeId(NodeId nodeId); - - /** * Get the LogAggregationStatus. * @return LogAggregationStatus */ @@ -101,4 +90,18 @@ public abstract void setLogAggregationStatus( @Public @Unstable public abstract void setDiagnosticMessage(String diagnosticMessage); + + /** + * Get all the failure information for which the log aggregation + * fails in any cycles + * + * @return failure information + */ + @Public + @Unstable + public abstract String getFailureMessages(); + + @Public + @Unstable + public abstract void setFailureMessages(String failureMessage); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 227363f..767e4b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,10 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import java.util.Map; +import java.util.List; import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -54,9 +53,9 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); - public abstract Map + public abstract List getLogAggregationReportsForApps(); public abstract void setLogAggregationReportsForApps( - Map logAggregationReportsForApps); + List logAggregationReportsForApps); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java index 75b6eab..e3a0ef0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java @@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -45,7 +42,6 @@ boolean viaProto = false; private ApplicationId applicationId; - private NodeId nodeId; public LogAggregationReportPBImpl() { builder = LogAggregationReportProto.newBuilder(); @@ -89,12 +85,6 @@ private void mergeLocalToBuilder() { builder.getApplicationId())) { builder.setApplicationId(convertToProtoFormat(this.applicationId)); } - - if (this.nodeId != null - && !((NodeIdPBImpl) this.nodeId).getProto().equals( - builder.getNodeId())) { - builder.setNodeId(convertToProtoFormat(this.nodeId)); - } } private void mergeLocalToProto() { @@ -193,32 +183,21 @@ public void setDiagnosticMessage(String diagnosticMessage) { } @Override - public NodeId getNodeId() { - if (this.nodeId != null) { - return this.nodeId; - } - + public String getFailureMessages() { LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasNodeId()) { + if (!p.hasFailureMessage()) { return null; } - this.nodeId = convertFromProtoFormat(p.getNodeId()); - return this.nodeId; + return p.getFailureMessage(); } @Override - public void setNodeId(NodeId nodeId) { + public void setFailureMessages(String failureMessage) { maybeInitBuilder(); - if (nodeId == null) - builder.clearNodeId(); - this.nodeId = nodeId; - } - - private NodeIdProto convertToProtoFormat(NodeId t) { - return ((NodeIdPBImpl) t).getProto(); - } - - private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { - return new NodeIdPBImpl(nodeId); + if (failureMessage == null) { + builder.clearFailureMessage(); + return; + } + builder.setFailureMessage(failureMessage); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 03db39c..77fbc7e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,21 +18,16 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; -import java.util.HashMap; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -51,8 +46,7 @@ private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; private Set labels = null; - private Map - logAggregationReportsForApps = null; + private List logAggregationReportsForApps = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -110,12 +104,35 @@ private void mergeLocalToBuilder() { private void addLogAggregationStatusForAppsToProto() { maybeInitBuilder(); builder.clearLogAggregationReportsForApps(); - for (Entry entry : logAggregationReportsForApps - .entrySet()) { - builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto - .newBuilder().setAppId(convertToProtoFormat(entry.getKey())) - .setLogAggregationReport(convertToProtoFormat(entry.getValue()))); + if (this.logAggregationReportsForApps == null) { + return; } + Iterable it = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = + logAggregationReportsForApps.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public LogAggregationReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllLogAggregationReportsForApps(it); } private LogAggregationReportProto convertToProtoFormat( @@ -246,17 +263,8 @@ private void initNodeLabels() { labels = new HashSet(nodeLabels.getElementsList()); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - @Override - public Map - getLogAggregationReportsForApps() { + public List getLogAggregationReportsForApps() { if (this.logAggregationReportsForApps != null) { return this.logAggregationReportsForApps; } @@ -266,15 +274,11 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) { private void initLogAggregationReportsForApps() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = + List list = p.getLogAggregationReportsForAppsList(); - this.logAggregationReportsForApps = - new HashMap(); - for (LogAggregationReportsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - LogAggregationReport report = - convertFromProtoFormat(c.getLogAggregationReport()); - this.logAggregationReportsForApps.put(appId, report); + this.logAggregationReportsForApps = new ArrayList(); + for (LogAggregationReportProto c : list) { + this.logAggregationReportsForApps.add(convertFromProtoFormat(c)); } } @@ -285,14 +289,10 @@ private LogAggregationReport convertFromProtoFormat( @Override public void setLogAggregationReportsForApps( - Map logAggregationStatusForApps) { - if (logAggregationStatusForApps == null - || logAggregationStatusForApps.isEmpty()) { - return; + List logAggregationStatusForApps) { + if(logAggregationStatusForApps == null) { + builder.clearLogAggregationReportsForApps(); } - maybeInitBuilder(); - this.logAggregationReportsForApps = - new HashMap(); - this.logAggregationReportsForApps.putAll(logAggregationStatusForApps); + this.logAggregationReportsForApps = logAggregationStatusForApps; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java index dd5a4c8..043e178 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; @@ -192,8 +193,17 @@ public ApplicationReport run() throws Exception { : "ApplicationMaster"); if (webUiType != null && webUiType.equals(YarnWebParams.RM_WEB_UI)) { - overviewTable._("Log Aggregation Status", - root_url("logaggregationstatus", app.getAppId()), "Status"); + LogAggregationStatus status = getLogAggregationStatus(); + if (status == null) { + overviewTable._("Log Aggregation Status", "N/A"); + } else if (status == LogAggregationStatus.DISABLED + || status == LogAggregationStatus.NOT_START + || status == LogAggregationStatus.SUCCEEDED) { + overviewTable._("Log Aggregation Status", status.name()); + } else { + overviewTable._("Log Aggregation Status", + root_url("logaggregationstatus", app.getAppId()), status.name()); + } } overviewTable._("Diagnostics:", app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo()); @@ -342,4 +352,8 @@ private String clairfyAppFinalStatus(FinalApplicationStatus status) { protected void createApplicationMetricsTable(Block html) { } + + protected LogAggregationStatus getLogAggregationStatus() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d34c9f7..25080d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -50,19 +50,14 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; optional StringArrayProto nodeLabels = 4; - repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5; -} - -message LogAggregationReportsForAppsProto { - optional ApplicationIdProto appId = 1; - optional LogAggregationReportProto log_aggregation_report = 2; + repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; } message LogAggregationReportProto { -optional ApplicationIdProto application_id = 1; -optional NodeIdProto node_id = 2; -optional LogAggregationStatusProto log_aggregation_status = 3; -optional string diagnostics = 4 [default = "N/A"]; + optional ApplicationIdProto application_id = 1; + optional LogAggregationStatusProto log_aggregation_status = 2; + optional string diagnostics = 3 [default = "N/A"]; + optional string failure_message = 4 [default = "N/A"]; } message NodeHeartbeatResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b1ab5f1..a693b7e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -665,7 +664,7 @@ public void run() { if (logAggregationEnabled) { // pull log aggregation status for application running in this NM - Map logAggregationReports = + List logAggregationReports = getLogAggregationReportsForApps(context .getLogAggregationStatusForApps()); if (logAggregationReports != null @@ -809,47 +808,14 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { statusUpdater.start(); } - private Map - getLogAggregationReportsForApps( - ConcurrentLinkedQueue lastestLogAggregationStatus) { - Map latestLogAggregationReports = - new HashMap(); + private List getLogAggregationReportsForApps( + ConcurrentLinkedQueue lastestLogAggregationStatus) { LogAggregationReport status; while ((status = lastestLogAggregationStatus.poll()) != null) { this.logAggregationReportForAppsTempList.add(status); } - for (LogAggregationReport logAggregationReport - : this.logAggregationReportForAppsTempList) { - LogAggregationReport report = null; - if (latestLogAggregationReports.containsKey(logAggregationReport - .getApplicationId())) { - report = - latestLogAggregationReports.get(logAggregationReport - .getApplicationId()); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - String message = report.getDiagnosticMessage(); - if (logAggregationReport.getDiagnosticMessage() != null - && !logAggregationReport.getDiagnosticMessage().isEmpty()) { - if (message != null) { - message += logAggregationReport.getDiagnosticMessage(); - } else { - message = logAggregationReport.getDiagnosticMessage(); - } - report.setDiagnosticMessage(message); - } - } else { - report = Records.newRecord(LogAggregationReport.class); - report.setApplicationId(logAggregationReport.getApplicationId()); - report.setNodeId(this.nodeId); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - report - .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage()); - } - latestLogAggregationReports.put(logAggregationReport.getApplicationId(), - report); - } - return latestLogAggregationReports; + List reports = new ArrayList(); + reports.addAll(logAggregationReportForAppsTempList); + return reports; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 3f09e5d..fa1bd40 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -306,6 +306,7 @@ private void uploadLogsForContainers(boolean appFinished) { + currentTime); String diagnosticMessage = ""; + boolean logAggregationSucceedInThisCycle = true; final boolean rename = uploadedLogsInThisCycle; try { userUgi.doAs(new PrivilegedExceptionAction() { @@ -332,6 +333,7 @@ public Object run() throws Exception { "Failed to move temporary log file to final location: [" + remoteNodeTmpLogFileForApp + "] to [" + renamedPath + "]", e); + logAggregationSucceedInThisCycle = false; diagnosticMessage = "Log uploaded failed for Application: " + appId + " in NodeManager: " @@ -343,8 +345,10 @@ public Object run() throws Exception { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); - report.setNodeId(nodeId); report.setDiagnosticMessage(diagnosticMessage); + if (!logAggregationSucceedInThisCycle) { + report.setFailureMessages(diagnosticMessage); + } if (appFinished) { report.setLogAggregationStatus(renameTemporaryLogFileFailed ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 8abc478..2fa9723 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -22,12 +22,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -106,6 +109,7 @@ private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; + private static int MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = 10; // Immutable fields private final ApplicationId applicationId; @@ -152,6 +156,12 @@ private final Map logAggregationStatus = new HashMap(); private LogAggregationStatus logAggregationStatusForAppReport; + private int logAggregationSucceed = 0; + private int logAggregationFailed = 0; + private Map> LogAggregationDiagnosticsForNMs = + new HashMap>(); + private Map> LogAggregationFailureMessagesForNMs = + new HashMap>(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -437,6 +447,11 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + if (this.logAggregationEnabled) { + this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; + } else { + this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED; + } } @Override @@ -834,10 +849,9 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), - LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent - .getNodeId(), app.logAggregationEnabled - ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, - "")); + LogAggregationReport.newInstance(app.applicationId, + app.logAggregationEnabled ? LogAggregationStatus.NOT_START + : LogAggregationStatus.DISABLED, "", "")); } }; } @@ -1401,18 +1415,20 @@ protected Credentials parseCredentials() throws IOException { Map outputs = new HashMap(); outputs.putAll(logAggregationStatus); - for (Entry output : outputs.entrySet()) { - if (!output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.TIME_OUT) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { - output.getValue().setLogAggregationStatus( - LogAggregationStatus.TIME_OUT); + if (!isLogAggregationFinished()) { + for (Entry output : outputs.entrySet()) { + if (!output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.TIME_OUT) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FAILED) + && isAppInFinalState(this) + && System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + output.getValue().setLogAggregationStatus( + LogAggregationStatus.TIME_OUT); + } } } return outputs; @@ -1424,10 +1440,12 @@ protected Credentials parseCredentials() throws IOException { public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { try { this.writeLock.lock(); - if (this.logAggregationEnabled) { + if (this.logAggregationEnabled && !isLogAggregationFinished()) { LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); + boolean statusChanged = false; if (curReport == null) { this.logAggregationStatus.put(nodeId, report); + statusChanged = true; } else { if (curReport.getLogAggregationStatus().equals( LogAggregationStatus.TIME_OUT)) { @@ -1437,17 +1455,103 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { LogAggregationStatus.FAILED)) { curReport.setLogAggregationStatus(report .getLogAggregationStatus()); + statusChanged = true; } } else { - curReport.setLogAggregationStatus(report.getLogAggregationStatus()); + if (!curReport.getLogAggregationStatus().equals( + LogAggregationStatus.SUCCEEDED) + && !curReport.getLogAggregationStatus().equals( + LogAggregationStatus.FAILED) + && (report.getLogAggregationStatus().equals( + LogAggregationStatus.SUCCEEDED) + || report.getLogAggregationStatus().equals( + LogAggregationStatus.FAILED))) { + statusChanged = true; + } + curReport.setLogAggregationStatus( + report.getLogAggregationStatus()); + } + } + if (report.getDiagnosticMessage() != null + && !report.getDiagnosticMessage().isEmpty()) { + List diagnostics = + LogAggregationDiagnosticsForNMs.get(nodeId); + if (diagnostics == null) { + diagnostics = new ArrayList(); + } else { + if (diagnostics.size() == + MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY) { + diagnostics.remove(0); + } + } + diagnostics.add(report.getDiagnosticMessage()); + LogAggregationDiagnosticsForNMs.put(nodeId, diagnostics); + StringBuilder builder = new StringBuilder(); + for (String message : diagnostics) { + builder.append(message); + builder.append("\n"); + } + this.logAggregationStatus.get(nodeId).setDiagnosticMessage( + builder.toString()); + } + if (report.getFailureMessages() != null + && !report.getFailureMessages().isEmpty()) { + List failureMessages = + LogAggregationFailureMessagesForNMs.get(nodeId); + if (failureMessages == null) { + failureMessages = new ArrayList(); + } else { + if (failureMessages.size() == + MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY) { + failureMessages.remove(0); + } } + failureMessages.add(report.getFailureMessages()); + LogAggregationFailureMessagesForNMs.put(nodeId, failureMessages); + StringBuilder builder = new StringBuilder(); + for (String failure : failureMessages) { + builder.append(failure); + builder.append("\n"); + } + this.logAggregationStatus.get(nodeId).setFailureMessages( + builder.toString()); + } - if (report.getDiagnosticMessage() != null - && !report.getDiagnosticMessage().isEmpty()) { - curReport - .setDiagnosticMessage(curReport.getDiagnosticMessage() == null - ? report.getDiagnosticMessage() : curReport - .getDiagnosticMessage() + report.getDiagnosticMessage()); + if (isAppInFinalState(this) && statusChanged) { + LogAggregationStatus status = + this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); + if (status.equals(LogAggregationStatus.SUCCEEDED)) { + this.logAggregationSucceed++; + } else if (status.equals(LogAggregationStatus.FAILED)) { + this.logAggregationFailed++; + } + if (this.logAggregationSucceed == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; + // Since the log aggregation status for this application for all NMs + // is SUCCEEDED, it means all logs are aggregated successfully. + // We could remove all the cached log aggregation reports + this.logAggregationStatus.clear(); + this.LogAggregationDiagnosticsForNMs.clear(); + this.LogAggregationFailureMessagesForNMs.clear(); + } else if (this.logAggregationSucceed + this.logAggregationFailed + == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; + // We have collected the log aggregation status for all NMs. + // The log aggregation status is FAILED which means the log + // aggregation fails in some NMs. We are only interested in the + // nodes where the log aggregation is failed. So we could remove + // the log aggregation details for those succeeded NMs + for (Iterator> it = + this.logAggregationStatus.entrySet().iterator(); it.hasNext();) { + Map.Entry entry = it.next(); + if (entry.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED)) { + it.remove(); + // Also remove the diagnostics for this NM + this.LogAggregationDiagnosticsForNMs.remove(entry.getKey()); + } + } } } } @@ -1458,19 +1562,15 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { @Override public LogAggregationStatus getLogAggregationStatusForAppReport() { - if (!logAggregationEnabled) { - return LogAggregationStatus.DISABLED; - } - if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED - || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) { - return this.logAggregationStatusForAppReport; - } try { this.readLock.lock(); + if (isLogAggregationFinished()) { + return this.logAggregationStatusForAppReport; + } Map reports = getLogAggregationReportsForApp(); if (reports.size() == 0) { - return null; + return this.logAggregationStatusForAppReport; } int logNotStartCount = 0; int logCompletedCount = 0; @@ -1506,13 +1606,11 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } } @@ -1521,4 +1619,16 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { this.readLock.unlock(); } } + + private boolean isLogAggregationFinished() { + try { + this.readLock.lock(); + return this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.SUCCEEDED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.FAILED); + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3be1867..a11aacf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -22,8 +22,6 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -777,7 +775,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.handleContainerStatus(statusEvent.getContainers()); - Map logAggregationReportsForApps = + List logAggregationReportsForApps = statusEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { @@ -915,12 +913,11 @@ private void handleContainerStatus(List containerStatuses) { } private void handleLogAggregationStatus( - Map logAggregationReportsForApps) { - for (Entry report : - logAggregationReportsForApps.entrySet()) { - RMApp rmApp = this.context.getRMApps().get(report.getKey()); + List logAggregationReportsForApps) { + for (LogAggregationReport report : logAggregationReportsForApps) { + RMApp rmApp = this.context.getRMApps().get(report.getApplicationId()); if (rmApp != null) { - ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue()); + ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 4bbf610..b95d7d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; -import java.util.Map; - import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -34,7 +32,7 @@ private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; - private Map logAggregationReportsForApps; + private List logAggregationReportsForApps; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, @@ -50,7 +48,7 @@ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, NodeHeartbeatResponse latestResponse, - Map logAggregationReportsForApps) { + List logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; @@ -75,13 +73,12 @@ public NodeHeartbeatResponse getLatestResponse() { return this.keepAliveAppIds; } - public Map - getLogAggregationReportsForApps() { + public List getLogAggregationReportsForApps() { return this.logAggregationReportsForApps; } public void setLogAggregationReportsForApps( - Map logAggregationReportsForApps) { + List logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java index 43e26be..38e0e3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.AppBlock; -import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; @@ -170,4 +170,13 @@ protected void generateApplicationTable(Block html, tbody._()._(); } + + @Override + protected LogAggregationStatus getLogAggregationStatus() { + RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID); + if (rmApp == null) { + return null; + } + return rmApp.getLogAggregationStatusForAppReport(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java index a2f61e3..083bb25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java @@ -28,11 +28,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -49,14 +47,11 @@ private static final Log LOG = LogFactory .getLog(RMAppLogAggregationStatusBlock.class); private final ResourceManager rm; - private final Configuration conf; @Inject - RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm, - Configuration conf) { + RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm) { super(ctx); this.rm = rm; - this.conf = conf; } @Override @@ -106,24 +101,24 @@ protected void render(Block html) { table_description._(); div_description._(); - boolean logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + RMApp rmApp = rm.getRMContext().getRMApps().get(appId); // Application Log aggregation status Table DIV div = html.div(_INFO_WRAP); TABLE> table = div.h3( "Log Aggregation: " - + (logAggregationEnabled ? "Enabled" : "Disabled")).table( + + (rmApp == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport().name())).table( "#LogAggregationStatus"); table. tr(). th(_TH, "NodeId"). th(_TH, "Log Aggregation Status"). - th(_TH, "Diagnostis Message"). + th(_TH, "Last 10 Diagnostis Messages"). + th(_TH, "Last 10 Failure Messages"). _(); - RMApp rmApp = rm.getRMContext().getRMApps().get(appId); if (rmApp != null) { Map logAggregationReports = rmApp.getLogAggregationReportsForApp(); @@ -136,10 +131,14 @@ protected void render(Block html) { String message = report.getValue() == null ? null : report.getValue() .getDiagnosticMessage(); + String failureMessage = + report.getValue() == null ? null : report.getValue() + .getFailureMessages(); table.tr() .td(report.getKey().toString()) .td(status == null ? "N/A" : status.toString()) - .td(message == null ? "N/A" : message)._(); + .td(message == null ? "N/A" : message) + .td(failureMessage == null ? "N/A" : failureMessage)._(); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 4eec63f..40fa80d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock; import java.util.ArrayList; -import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -155,26 +155,26 @@ public void testLogAggregationStatus() throws Exception { .getLogAggregationStatus()); } - Map node1ReportForApp = - new HashMap(); + List node1ReportForApp = + new ArrayList(); String messageForNode1_1 = "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1 = - LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.RUNNING, messageForNode1_1); - node1ReportForApp.put(appId, report1); + LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, + messageForNode1_1, ""); + node1ReportForApp.add(report1); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp)); - Map node2ReportForApp = - new HashMap(); + List node2ReportForApp = + new ArrayList(); String messageForNode2_1 = "node2 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report2 = - LogAggregationReport.newInstance(appId, nodeId2, - LogAggregationStatus.RUNNING, messageForNode2_1); - node2ReportForApp.put(appId, report2); + LogAggregationReport.newInstance(appId, + LogAggregationStatus.RUNNING, messageForNode2_1, ""); + node2ReportForApp.add(report2); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node2ReportForApp)); @@ -190,12 +190,12 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1, report.getValue() + Assert.assertEquals(messageForNode1_1 + "\n", report.getValue() .getDiagnosticMessage()); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode2_1, report.getValue() + Assert.assertEquals(messageForNode2_1 + "\n", report.getValue() .getDiagnosticMessage()); } else { // should not contain log aggregation report for other nodes @@ -205,14 +205,14 @@ public void testLogAggregationStatus() throws Exception { } // node1 updates its log aggregation status again - Map node1ReportForApp2 = - new HashMap(); + List node1ReportForApp2 = + new ArrayList(); String messageForNode1_2 = "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1_2 = - LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.RUNNING, messageForNode1_2); - node1ReportForApp2.put(appId, report1_2); + LogAggregationReport.newInstance(appId, + LogAggregationStatus.RUNNING, messageForNode1_2, ""); + node1ReportForApp2.add(report1_2); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp2)); @@ -230,12 +230,13 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report - .getValue().getDiagnosticMessage()); + Assert.assertEquals( + messageForNode1_1 + "\n" + messageForNode1_2 + "\n", report + .getValue().getDiagnosticMessage()); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode2_1, report.getValue() + Assert.assertEquals(messageForNode2_1 + "\n", report.getValue() .getDiagnosticMessage()); } else { // should not contain log aggregation report for other nodes @@ -268,15 +269,22 @@ public void testLogAggregationStatus() throws Exception { // Finally, node1 finished its log aggregation and sent out its final // log aggregation status. The log aggregation status for node1 should // be changed from TIME_OUT to SUCCEEDED - Map node1ReportForApp3 = - new HashMap(); + List node1ReportForApp3 = + new ArrayList(); String messageForNode1_3 = "node1 final logAggregation status updated at " + System.currentTimeMillis(); - LogAggregationReport report1_3 = - LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.SUCCEEDED, messageForNode1_3); - node1ReportForApp3.put(appId, report1_3); + LogAggregationReport report1_3; + for (int i = 1; i < 10 ; i ++) { + report1_3 = + LogAggregationReport.newInstance(appId, + LogAggregationStatus.RUNNING, "test_message_" + i, ""); + node1ReportForApp3.add(report1_3); + } + node1ReportForApp3.add(LogAggregationReport.newInstance(appId, + LogAggregationStatus.SUCCEEDED, messageForNode1_3, "")); + // For every logAggregationReport cached in memory, we can only save at most + // 10 diagnostic messages/failure messages node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp3)); @@ -290,8 +298,15 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2 - + messageForNode1_3, report.getValue().getDiagnosticMessage()); + StringBuilder builder = new StringBuilder(); + for (int i = 1; i < 10; i ++) { + builder.append("test_message_" + i); + builder.append("\n"); + } + builder.append(messageForNode1_3); + builder.append("\n"); + Assert.assertEquals(builder.toString(), report.getValue() + .getDiagnosticMessage()); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue() .getLogAggregationStatus()); @@ -301,6 +316,28 @@ public void testLogAggregationStatus() throws Exception { .fail("should not contain log aggregation report for other nodes"); } } + + // update log aggregationStatus for node2 as FAILED, + // so the log aggregation status for the App will become FAILED, + // and we only keep the log aggregation reports whose status is FAILED, + // so the log aggregation report for node1 will be removed. + List node2ReportForApp2 = + new ArrayList(); + LogAggregationReport report2_2 = + LogAggregationReport.newInstance(appId, + LogAggregationStatus.FAILED, "", "Fail_Message"); + node2ReportForApp2.add(report2_2); + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus + .newInstance(true, null, 0), new ArrayList(), null, + null, node2ReportForApp2)); + Assert.assertEquals(LogAggregationStatus.FAILED, + rmApp.getLogAggregationStatusForAppReport()); + logAggregationStatus = rmApp.getLogAggregationReportsForApp(); + Assert.assertTrue(logAggregationStatus.size() == 1); + Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID())); + Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID())); + Assert.assertEquals("Fail_Message" + "\n", + logAggregationStatus.get(node2.getNodeID()).getFailureMessages()); } @Test (timeout = 10000) @@ -317,9 +354,11 @@ public void testGetLogAggregationStatusForAppReport() { // Enable the log aggregation conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); rmApp = (RMAppImpl)createRMApp(conf); - // If we do not know any NodeManagers for this application , - // the log aggregation status will return null - Assert.assertNull(rmApp.getLogAggregationStatusForAppReport()); + // If we do not know any NodeManagers for this application , and + // the log aggregation is enabled, the log aggregation status will + // return NOT_START + Assert.assertEquals(LogAggregationStatus.NOT_START, + rmApp.getLogAggregationStatusForAppReport()); NodeId nodeId1 = NodeId.newInstance("localhost", 1111); NodeId nodeId2 = NodeId.newInstance("localhost", 2222); @@ -329,24 +368,32 @@ public void testGetLogAggregationStatusForAppReport() { // If the log aggregation status for all NMs are NOT_START, // the log aggregation status for this app will return NOT_START rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); Assert.assertEquals(LogAggregationStatus.NOT_START, rmApp.getLogAggregationStatusForAppReport()); rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, "")); + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, + "", "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); Assert.assertEquals(LogAggregationStatus.RUNNING, rmApp.getLogAggregationStatusForAppReport()); @@ -357,13 +404,17 @@ public void testGetLogAggregationStatusForAppReport() { // others are SUCCEEDED, the log aggregation status for this app will // return TIME_OUT rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, + "", "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); @@ -371,17 +422,38 @@ public void testGetLogAggregationStatusForAppReport() { // is at the final state, the log aggregation status for this app will // return SUCCEEDED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "","")); Assert.assertEquals(LogAggregationStatus.SUCCEEDED, rmApp.getLogAggregationStatusForAppReport()); rmApp = (RMAppImpl)createRMApp(conf); + // If the log aggregation status for at least one of NMs are RUNNING, + // the log aggregation status for this app will return RUNNING + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, + "", "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, + "", "")); + Assert.assertEquals(LogAggregationStatus.RUNNING, + rmApp.getLogAggregationStatusForAppReport()); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); // If at least of one log aggregation status for one NM is FAILED, @@ -389,13 +461,17 @@ public void testGetLogAggregationStatusForAppReport() { // at the final state, the log aggregation status for this app // will return FAILED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, + "", "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), LogAggregationStatus.FAILED, + "", "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, + "", "")); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport());