diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java index b2270d8..f7e4c2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -45,11 +47,13 @@ @Public @Unstable public static LogAggregationReport newInstance(ApplicationId appId, - NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) { + NodeId nodeId, LogAggregationStatus status, List diagnosticMessage, + List failureMessages) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setLogAggregationStatus(status); report.setDiagnosticMessage(diagnosticMessage); + report.setFailureMessages(failureMessages); return report; } @@ -91,14 +95,27 @@ public abstract void setLogAggregationStatus( LogAggregationStatus logAggregationStatus); /** - * Get the diagnositic information of this log aggregation + * Get the diagnositic information of this log aggregation * @return diagnositic information of this log aggregation */ @Public @Unstable - public abstract String getDiagnosticMessage(); + public abstract List getDiagnosticMessage(); + + @Public + @Unstable + public abstract void setDiagnosticMessage(List diagnosticMessage); + + /** + * Get all the failure information for which the log aggregation + * fails in any cycles + * @return failure information + */ + @Public + @Unstable + public abstract List getFailureMessages(); @Public @Unstable - public abstract void setDiagnosticMessage(String diagnosticMessage); + public abstract void setFailureMessages(List failureMessages); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java index 75b6eab..414a5fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,6 +49,9 @@ private ApplicationId applicationId; private NodeId nodeId; + private List diagnosticMessages = null; + private List failureMessages = null; + public LogAggregationReportPBImpl() { builder = LogAggregationReportProto.newBuilder(); } @@ -95,6 +100,16 @@ private void mergeLocalToBuilder() { builder.getNodeId())) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } + + if (this.diagnosticMessages != null && !this.diagnosticMessages.isEmpty()) { + builder.clearDiagnostics(); + builder.addAllDiagnostics(this.diagnosticMessages); + } + + if (this.failureMessages != null && !this.failureMessages.isEmpty()) { + builder.clearFailureMessage(); + builder.addAllFailureMessage(this.failureMessages); + } } private void mergeLocalToProto() { @@ -174,22 +189,27 @@ private LogAggregationStatus convertFromProtoFormat( } @Override - public String getDiagnosticMessage() { - LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasDiagnostics()) { - return null; - } - return p.getDiagnostics(); + public List getDiagnosticMessage() { + initDiagnosticMessages(); + return this.diagnosticMessages; } @Override - public void setDiagnosticMessage(String diagnosticMessage) { + public void setDiagnosticMessage(List diagnosticMessages) { maybeInitBuilder(); - if (diagnosticMessage == null) { + if (diagnosticMessages == null || diagnosticMessages.isEmpty()) { builder.clearDiagnostics(); + } + this.diagnosticMessages = diagnosticMessages; + } + + private void initDiagnosticMessages() { + if (this.diagnosticMessages != null) { return; } - builder.setDiagnostics(diagnosticMessage); + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + this.diagnosticMessages = new ArrayList(); + this.diagnosticMessages.addAll(p.getDiagnosticsList()); } @Override @@ -221,4 +241,28 @@ private NodeIdProto convertToProtoFormat(NodeId t) { private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { return new NodeIdPBImpl(nodeId); } + + @Override + public List getFailureMessages() { + initFailureMessages(); + return this.failureMessages; + } + + @Override + public void setFailureMessages(List failureMessages) { + maybeInitBuilder(); + if (failureMessages == null || failureMessages.isEmpty()) { + builder.clearFailureMessage(); + } + this.failureMessages = failureMessages; + } + + private void initFailureMessages() { + if (this.failureMessages != null) { + return; + } + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + this.failureMessages = new ArrayList(); + this.failureMessages.addAll(p.getFailureMessageList()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java index dd5a4c8..043e178 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; @@ -192,8 +193,17 @@ public ApplicationReport run() throws Exception { : "ApplicationMaster"); if (webUiType != null && webUiType.equals(YarnWebParams.RM_WEB_UI)) { - overviewTable._("Log Aggregation Status", - root_url("logaggregationstatus", app.getAppId()), "Status"); + LogAggregationStatus status = getLogAggregationStatus(); + if (status == null) { + overviewTable._("Log Aggregation Status", "N/A"); + } else if (status == LogAggregationStatus.DISABLED + || status == LogAggregationStatus.NOT_START + || status == LogAggregationStatus.SUCCEEDED) { + overviewTable._("Log Aggregation Status", status.name()); + } else { + overviewTable._("Log Aggregation Status", + root_url("logaggregationstatus", app.getAppId()), status.name()); + } } overviewTable._("Diagnostics:", app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo()); @@ -342,4 +352,8 @@ private String clairfyAppFinalStatus(FinalApplicationStatus status) { protected void createApplicationMetricsTable(Block html) { } + + protected LogAggregationStatus getLogAggregationStatus() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d34c9f7..ca769c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -59,10 +59,11 @@ message LogAggregationReportsForAppsProto { } message LogAggregationReportProto { -optional ApplicationIdProto application_id = 1; -optional NodeIdProto node_id = 2; -optional LogAggregationStatusProto log_aggregation_status = 3; -optional string diagnostics = 4 [default = "N/A"]; + optional ApplicationIdProto application_id = 1; + optional NodeIdProto node_id = 2; + optional LogAggregationStatusProto log_aggregation_status = 3; + repeated string diagnostics = 4; + repeated string failure_message = 5; } message NodeHeartbeatResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 42a4234..9cde457 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LogAggregationReportInNM; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -80,6 +80,6 @@ void setDecommissioned(boolean isDecommissioned); - ConcurrentLinkedQueue + ConcurrentLinkedQueue getLogAggregationStatusForApps(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0bac8d7..04e9e54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LogAggregationReportInNM; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -386,7 +386,7 @@ public void run() { .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); private final NMStateStoreService stateStore; private boolean isDecommissioned = false; - private final ConcurrentLinkedQueue + private final ConcurrentLinkedQueue logAggregationReportForApps; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, @@ -402,7 +402,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); this.stateStore = stateStore; this.logAggregationReportForApps = new ConcurrentLinkedQueue< - LogAggregationReport>(); + LogAggregationReportInNM>(); } /** @@ -496,7 +496,7 @@ public void setSystemCrendentialsForApps( } @Override - public ConcurrentLinkedQueue + public ConcurrentLinkedQueue getLogAggregationStatusForApps() { return this.logAggregationReportForApps; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b1ab5f1..c875bda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -22,6 +22,7 @@ import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LogAggregationReportInNM; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -120,7 +123,7 @@ private boolean logAggregationEnabled; - private final List logAggregationReportForAppsTempList; + private final List logAggregationReportForAppsTempList; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -152,7 +155,7 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.pendingCompletedContainers = new HashMap(); this.logAggregationReportForAppsTempList = - new ArrayList(); + new ArrayList(); } @Override @@ -811,14 +814,14 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { private Map getLogAggregationReportsForApps( - ConcurrentLinkedQueue lastestLogAggregationStatus) { + ConcurrentLinkedQueue lastestLogAggregationStatus) { Map latestLogAggregationReports = new HashMap(); - LogAggregationReport status; + LogAggregationReportInNM status; while ((status = lastestLogAggregationStatus.poll()) != null) { this.logAggregationReportForAppsTempList.add(status); } - for (LogAggregationReport logAggregationReport + for (LogAggregationReportInNM logAggregationReport : this.logAggregationReportForAppsTempList) { LogAggregationReport report = null; if (latestLogAggregationReports.containsKey(logAggregationReport @@ -826,26 +829,32 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { report = latestLogAggregationReports.get(logAggregationReport .getApplicationId()); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - String message = report.getDiagnosticMessage(); - if (logAggregationReport.getDiagnosticMessage() != null - && !logAggregationReport.getDiagnosticMessage().isEmpty()) { - if (message != null) { - message += logAggregationReport.getDiagnosticMessage(); + } else { + report = Records.newRecord(LogAggregationReport.class); + } + report.setApplicationId(logAggregationReport.getApplicationId()); + report.setNodeId(this.nodeId); + report.setLogAggregationStatus(logAggregationReport + .getLogAggregationStatus()); + if (logAggregationReport.getDiagnosticMessage() != null + && !logAggregationReport.getDiagnosticMessage().isEmpty()) { + if (logAggregationReport.getLogAggregationStatusInThisCycle() + == LogAggregationStatus.FAILED) { + if (report.getFailureMessages() == null) { + report.setFailureMessages(Arrays.asList(logAggregationReport + .getDiagnosticMessage())); } else { - message = logAggregationReport.getDiagnosticMessage(); + report.getFailureMessages().add( + logAggregationReport.getDiagnosticMessage()); } - report.setDiagnosticMessage(message); } - } else { - report = Records.newRecord(LogAggregationReport.class); - report.setApplicationId(logAggregationReport.getApplicationId()); - report.setNodeId(this.nodeId); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - report - .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage()); + if (report.getDiagnosticMessage() == null) { + report.setDiagnosticMessage(Arrays.asList(logAggregationReport + .getDiagnosticMessage())); + } else { + report.getDiagnosticMessage().add( + logAggregationReport.getDiagnosticMessage()); + } } latestLogAggregationReports.put(logAggregationReport.getApplicationId(), report); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LogAggregationReportInNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LogAggregationReportInNM.java new file mode 100644 index 0000000..af9ac61 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LogAggregationReportInNM.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; + +public interface LogAggregationReportInNM { + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId appId); + + public abstract NodeId getNodeId(); + + public abstract void setNodeId(NodeId nodeId); + + public abstract LogAggregationStatus getLogAggregationStatusInThisCycle(); + + public abstract void setLogAggregationStatusInThisCycle( + LogAggregationStatus logAggregationStatusInThisCycle); + + public abstract LogAggregationStatus getLogAggregationStatus(); + + public abstract void + setLogAggregationStatus(LogAggregationStatus logAggregationStatus); + + public abstract String getDiagnosticMessage(); + + public abstract void setDiagnosticMessage(String diagnosticMessage); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LogAggregationReportInNMPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LogAggregationReportInNMPBImpl.java new file mode 100644 index 0000000..121c7bc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LogAggregationReportInNMPBImpl.java @@ -0,0 +1,244 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LogAggregationReportInNMProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LogAggregationReportInNMProtoOrBuilder; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LogAggregationReportInNM; + +import com.google.protobuf.TextFormat; + +public class LogAggregationReportInNMPBImpl extends + ProtoBase implements + LogAggregationReportInNM { + + LogAggregationReportInNMProto proto = LogAggregationReportInNMProto + .getDefaultInstance(); + LogAggregationReportInNMProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId; + private NodeId nodeId; + + public LogAggregationReportInNMPBImpl() { + builder = LogAggregationReportInNMProto.newBuilder(); + } + + public LogAggregationReportInNMPBImpl(LogAggregationReportInNMProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public LogAggregationReportInNMProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null + && !((ApplicationIdPBImpl) this.applicationId).getProto().equals( + builder.getApplicationId())) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + + if (this.nodeId != null + && !((NodeIdPBImpl) this.nodeId).getProto().equals( + builder.getNodeId())) { + builder.setNodeId(convertToProtoFormat(this.nodeId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogAggregationReportInNMProto.newBuilder(proto); + } + viaProto = false; + } + @Override + public ApplicationId getApplicationId() { + if (this.applicationId != null) { + return this.applicationId; + } + + LogAggregationReportInNMProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) + builder.clearApplicationId(); + this.applicationId = appId; + } + + @Override + public NodeId getNodeId() { + if (this.nodeId != null) { + return this.nodeId; + } + + LogAggregationReportInNMProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeId()) { + return null; + } + this.nodeId = convertFromProtoFormat(p.getNodeId()); + return this.nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + maybeInitBuilder(); + if (nodeId == null) + builder.clearNodeId(); + this.nodeId = nodeId; + } + + @Override + public LogAggregationStatus getLogAggregationStatusInThisCycle() { + LogAggregationReportInNMProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasLogAggregationStatusInThisCycle()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationStatusInThisCycle()); + } + + @Override + public void setLogAggregationStatusInThisCycle( + LogAggregationStatus logAggregationStatusInThisCycle) { + maybeInitBuilder(); + if (logAggregationStatusInThisCycle == null) { + builder.clearLogAggregationStatusInThisCycle(); + return; + } + builder.setLogAggregationStatusInThisCycle( + convertToProtoFormat(logAggregationStatusInThisCycle)); + } + + @Override + public LogAggregationStatus getLogAggregationStatus() { + LogAggregationReportInNMProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasLogAggregationStatus()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationStatus()); + } + + @Override + public void + setLogAggregationStatus(LogAggregationStatus logAggregationStatus) { + maybeInitBuilder(); + if (logAggregationStatus == null) { + builder.clearLogAggregationStatus(); + return; + } + builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus)); + } + + @Override + public String getDiagnosticMessage() { + LogAggregationReportInNMProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnosticMessage(String diagnosticMessage) { + maybeInitBuilder(); + if (diagnosticMessage == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnosticMessage); + + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto applicationId) { + return new ApplicationIdPBImpl(applicationId); + } + + private NodeIdProto convertToProtoFormat(NodeId t) { + return ((NodeIdPBImpl) t).getProto(); + } + + private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { + return new NodeIdPBImpl(nodeId); + } + + private LogAggregationStatus convertFromProtoFormat( + LogAggregationStatusProto s) { + return ProtoUtils.convertFromProtoFormat(s); + } + + private LogAggregationStatusProto + convertToProtoFormat(LogAggregationStatus s) { + return ProtoUtils.convertToProtoFormat(s); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 3f09e5d..a29a29d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -58,10 +58,10 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; -import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LogAggregationReportInNM; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -307,6 +307,7 @@ private void uploadLogsForContainers(boolean appFinished) { String diagnosticMessage = ""; final boolean rename = uploadedLogsInThisCycle; + LogAggregationStatus statusInThisCycle = LogAggregationStatus.SUCCEEDED; try { userUgi.doAs(new PrivilegedExceptionAction() { @Override @@ -337,13 +338,15 @@ public Object run() throws Exception { + " in NodeManager: " + LogAggregationUtils.getNodeString(nodeId) + " at " + Times.format(currentTime) + "\n"; + statusInThisCycle = LogAggregationStatus.FAILED; renameTemporaryLogFileFailed = true; } - LogAggregationReport report = - Records.newRecord(LogAggregationReport.class); + LogAggregationReportInNM report = + Records.newRecord(LogAggregationReportInNM.class); report.setApplicationId(appId); report.setNodeId(nodeId); + report.setLogAggregationStatusInThisCycle(statusInThisCycle); report.setDiagnosticMessage(diagnosticMessage); if (appFinished) { report.setLogAggregationStatus(renameTemporaryLogFileFailed diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto index 6fde7cc..668432a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto @@ -57,3 +57,11 @@ message LocalizerHeartbeatResponseProto { optional LocalizerActionProto action = 1; repeated ResourceLocalizationSpecProto resources = 2; } + +message LogAggregationReportInNMProto { + optional ApplicationIdProto application_id = 1; + optional NodeIdProto node_id = 2; + optional LogAggregationStatusProto log_aggregation_status_in_this_cycle = 3; + optional LogAggregationStatusProto log_aggregation_status = 4; + optional string diagnostics = 5; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index b4e4965..26c4edd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -27,6 +27,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; @@ -106,6 +107,7 @@ private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; + private static int MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = 10; // Immutable fields private final ApplicationId applicationId; @@ -152,6 +154,8 @@ private final Map logAggregationStatus = new HashMap(); private LogAggregationStatus logAggregationStatusForAppReport; + private int logAggregationSucceed = 0; + private int logAggregationFailed = 0; // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -437,6 +441,11 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + if (this.logAggregationEnabled) { + logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; + } else { + logAggregationStatusForAppReport = LogAggregationStatus.DISABLED; + } } @Override @@ -837,7 +846,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent .getNodeId(), app.logAggregationEnabled ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, - "")); + null, null)); } }; } @@ -1401,18 +1410,20 @@ protected Credentials parseCredentials() throws IOException { Map outputs = new HashMap(); outputs.putAll(logAggregationStatus); - for (Entry output : outputs.entrySet()) { - if (!output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.TIME_OUT) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { - output.getValue().setLogAggregationStatus( - LogAggregationStatus.TIME_OUT); + if (!isLogAggregationFinished()) { + for (Entry output : outputs.entrySet()) { + if (!output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.TIME_OUT) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FAILED) + && isAppInFinalState(this) + && System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + output.getValue().setLogAggregationStatus( + LogAggregationStatus.TIME_OUT); + } } } return outputs; @@ -1424,30 +1435,66 @@ protected Credentials parseCredentials() throws IOException { public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { try { this.writeLock.lock(); - if (this.logAggregationEnabled) { + if (this.logAggregationEnabled + && !this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.SUCCEEDED) + && !this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.FAILED)) { LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); + boolean statusChanged = false; if (curReport == null) { + checkLogAggregationDiagnosticLimit(report); this.logAggregationStatus.put(nodeId, report); + statusChanged = true; } else { - if (curReport.getLogAggregationStatus().equals( - LogAggregationStatus.TIME_OUT)) { - if (report.getLogAggregationStatus().equals( - LogAggregationStatus.SUCCEEDED) - || report.getLogAggregationStatus().equals( - LogAggregationStatus.FAILED)) { - curReport.setLogAggregationStatus(report - .getLogAggregationStatus()); - } - } else { + if (!curReport.getLogAggregationStatus().equals( + LogAggregationStatus.SUCCEEDED) + && !curReport.getLogAggregationStatus().equals( + LogAggregationStatus.FAILED)) { curReport.setLogAggregationStatus(report.getLogAggregationStatus()); + if (report.getDiagnosticMessage() != null) { + curReport.getDiagnosticMessage().addAll( + report.getDiagnosticMessage()); + } + if (report.getFailureMessages() != null) { + curReport.getFailureMessages() + .addAll(report.getFailureMessages()); + } + checkLogAggregationDiagnosticLimit(curReport); + statusChanged = true; } - - if (report.getDiagnosticMessage() != null - && !report.getDiagnosticMessage().isEmpty()) { - curReport - .setDiagnosticMessage(curReport.getDiagnosticMessage() == null - ? report.getDiagnosticMessage() : curReport - .getDiagnosticMessage() + report.getDiagnosticMessage()); + } + if (isAppInFinalState(this) && statusChanged) { + LogAggregationStatus status = + this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); + if (status.equals(LogAggregationStatus.SUCCEEDED)) { + this.logAggregationSucceed++; + } else if (status.equals(LogAggregationStatus.FAILED)) { + this.logAggregationFailed++; + } + if (this.logAggregationSucceed == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; + // Since the log aggregation status for this application for all NMs + // is SUCCEEDED, it means all logs are aggregated successfully. + // We could remove all the cached log aggregation reports + this.logAggregationStatus.clear(); + } else if (this.logAggregationSucceed + this.logAggregationFailed + == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; + // We have collected the log aggregation status for all NMs. + // The log aggregation status is FAILED which means the log + // aggregation fails in some NMs. We are only interested in the + // nodes where the log aggregation is failed. So we could remove + // the log aggregation details for those succeeded NMs + for (Iterator> it = + this.logAggregationStatus.entrySet().iterator(); it.hasNext();) { + Map.Entry entry = it.next(); + if (entry.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED)) { + it.remove(); + } + } } } } @@ -1458,19 +1505,15 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { @Override public LogAggregationStatus getLogAggregationStatusForAppReport() { - if (!logAggregationEnabled) { - return LogAggregationStatus.DISABLED; - } - if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED - || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) { - return this.logAggregationStatusForAppReport; - } try { this.readLock.lock(); + if (isLogAggregationFinished()) { + return this.logAggregationStatusForAppReport; + } Map reports = getLogAggregationReportsForApp(); if (reports.size() == 0) { - return null; + return this.logAggregationStatusForAppReport; } int logNotStartCount = 0; int logCompletedCount = 0; @@ -1506,13 +1549,11 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } } @@ -1521,4 +1562,45 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { this.readLock.unlock(); } } + + private void checkLogAggregationDiagnosticLimit(LogAggregationReport report) { + if (report.getDiagnosticMessage() != null + && !report.getDiagnosticMessage().isEmpty()) { + if (report.getDiagnosticMessage().size() + > MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY) { + int delta = + report.getDiagnosticMessage().size() + - MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY; + while (delta > 0) { + report.getDiagnosticMessage().remove(0); + delta--; + } + } + } + if (report.getFailureMessages() != null + && !report.getFailureMessages().isEmpty()) { + if (report.getFailureMessages().size() + > MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY) { + int delta = + report.getFailureMessages().size() + - MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY; + while (delta > 0) { + report.getFailureMessages().remove(0); + delta--; + } + } + } + } + + private boolean isLogAggregationFinished() { + try { + this.readLock.lock(); + return this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.SUCCEEDED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.FAILED); + } finally { + this.readLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java index 3779b91..caac52e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -199,4 +200,13 @@ public ContainerReport run() throws Exception { tbody._()._(); } + + @Override + protected LogAggregationStatus getLogAggregationStatus() { + RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID); + if (rmApp == null) { + return null; + } + return rmApp.getLogAggregationStatusForAppReport(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java index a2f61e3..6affa1d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java @@ -28,11 +28,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -49,14 +47,11 @@ private static final Log LOG = LogFactory .getLog(RMAppLogAggregationStatusBlock.class); private final ResourceManager rm; - private final Configuration conf; @Inject - RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm, - Configuration conf) { + RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm) { super(ctx); this.rm = rm; - this.conf = conf; } @Override @@ -106,24 +101,25 @@ protected void render(Block html) { table_description._(); div_description._(); - boolean logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + RMApp rmApp = rm.getRMContext().getRMApps().get(appId); + // Application Log aggregation status Table DIV div = html.div(_INFO_WRAP); TABLE> table = div.h3( "Log Aggregation: " - + (logAggregationEnabled ? "Enabled" : "Disabled")).table( + + (rmApp == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport().name())).table( "#LogAggregationStatus"); table. tr(). th(_TH, "NodeId"). th(_TH, "Log Aggregation Status"). - th(_TH, "Diagnostis Message"). + th(_TH, "Last 10 Diagnostis Messages"). + th(_TH, "Last 10 Failure Messages"). _(); - RMApp rmApp = rm.getRMContext().getRMApps().get(appId); if (rmApp != null) { Map logAggregationReports = rmApp.getLogAggregationReportsForApp(); @@ -133,13 +129,24 @@ protected void render(Block html) { LogAggregationStatus status = report.getValue() == null ? null : report.getValue() .getLogAggregationStatus(); - String message = - report.getValue() == null ? null : report.getValue() - .getDiagnosticMessage(); + String messages = "N/A"; + if (report.getValue().getDiagnosticMessage() != null + && !report.getValue().getDiagnosticMessage().isEmpty()) { + for (String message : report.getValue().getDiagnosticMessage()) { + messages += message + "\n"; + } + } + String failurMessages = "N/A"; + if (report.getValue().getFailureMessages() != null + && !report.getValue().getFailureMessages().isEmpty()) { + for (String message : report.getValue().getFailureMessages()) { + failurMessages += message + "\n"; + } + } table.tr() .td(report.getKey().toString()) .td(status == null ? "N/A" : status.toString()) - .td(message == null ? "N/A" : message)._(); + .td(messages).td(failurMessages)._(); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 4eec63f..96d77fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -23,7 +23,10 @@ import static org.mockito.Mockito.mock; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -65,6 +68,7 @@ public class TestRMAppLogAggregationStatus { + private static final List EMPTY_LIST = Collections.emptyList(); private RMContext rmContext; private YarnScheduler scheduler; @@ -161,7 +165,8 @@ public void testLogAggregationStatus() throws Exception { "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1 = LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.RUNNING, messageForNode1_1); + LogAggregationStatus.RUNNING, Arrays.asList(messageForNode1_1), + EMPTY_LIST); node1ReportForApp.put(appId, report1); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, @@ -173,7 +178,8 @@ public void testLogAggregationStatus() throws Exception { "node2 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report2 = LogAggregationReport.newInstance(appId, nodeId2, - LogAggregationStatus.RUNNING, messageForNode2_1); + LogAggregationStatus.RUNNING, Arrays.asList(messageForNode2_1), + EMPTY_LIST); node2ReportForApp.put(appId, report2); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, @@ -190,13 +196,15 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); + Assert.assertTrue(report.getValue().getDiagnosticMessage().size() == 1); Assert.assertEquals(messageForNode1_1, report.getValue() - .getDiagnosticMessage()); + .getDiagnosticMessage().get(0)); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); + Assert.assertTrue(report.getValue().getDiagnosticMessage().size() == 1); Assert.assertEquals(messageForNode2_1, report.getValue() - .getDiagnosticMessage()); + .getDiagnosticMessage().get(0)); } else { // should not contain log aggregation report for other nodes Assert @@ -211,7 +219,8 @@ public void testLogAggregationStatus() throws Exception { "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1_2 = LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.RUNNING, messageForNode1_2); + LogAggregationStatus.RUNNING, Arrays.asList(messageForNode1_2), + EMPTY_LIST); node1ReportForApp2.put(appId, report1_2); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, @@ -230,13 +239,17 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report - .getValue().getDiagnosticMessage()); + Assert.assertTrue(report.getValue().getDiagnosticMessage().size() == 2); + Assert.assertEquals(messageForNode1_1, report.getValue() + .getDiagnosticMessage().get(0)); + Assert.assertEquals(messageForNode1_2, report.getValue() + .getDiagnosticMessage().get(1)); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); + Assert.assertTrue(report.getValue().getDiagnosticMessage().size() == 1); Assert.assertEquals(messageForNode2_1, report.getValue() - .getDiagnosticMessage()); + .getDiagnosticMessage().get(0)); } else { // should not contain log aggregation report for other nodes Assert @@ -270,12 +283,20 @@ public void testLogAggregationStatus() throws Exception { // be changed from TIME_OUT to SUCCEEDED Map node1ReportForApp3 = new HashMap(); + List messages = new ArrayList(); String messageForNode1_3 = "node1 final logAggregation status updated at " + System.currentTimeMillis(); + messages.add(messageForNode1_3); + for (int i = 1; i < 10 ; i ++) { + messages.add("test_message_" + i); + } + + // For every logAggregationReport cached in memory, we can only save at most + // 10 diagnostic messages/failure messages LogAggregationReport report1_3 = LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.SUCCEEDED, messageForNode1_3); + LogAggregationStatus.SUCCEEDED, messages, EMPTY_LIST); node1ReportForApp3.put(appId, report1_3); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, @@ -290,8 +311,14 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2 - + messageForNode1_3, report.getValue().getDiagnosticMessage()); + Assert + .assertTrue(report.getValue().getDiagnosticMessage().size() == 10); + Assert.assertEquals(messageForNode1_3, report.getValue() + .getDiagnosticMessage().get(0)); + for (int i = 1; i < 10; i ++) { + Assert.assertEquals("test_message_" + i, report.getValue() + .getDiagnosticMessage().get(i)); + } } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue() .getLogAggregationStatus()); @@ -301,6 +328,27 @@ public void testLogAggregationStatus() throws Exception { .fail("should not contain log aggregation report for other nodes"); } } + + // update log aggregationStatus for node2 as FAILED, + // so the log aggregation status for the App will become FAILED, + // and we only keep the log aggregation reports whose status is FAILED, + // so the log aggregation report for node1 will be removed. + Map node2ReportForApp2 = + new HashMap(); + LogAggregationReport report2_2 = + LogAggregationReport.newInstance(appId, nodeId2, + LogAggregationStatus.FAILED, EMPTY_LIST, + Arrays.asList("Fail_Message_1", "Fail_Message_2")); + node2ReportForApp2.put(appId, report2_2); + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus + .newInstance(true, null, 0), new ArrayList(), null, + null, node2ReportForApp2)); + Assert.assertEquals(LogAggregationStatus.FAILED, + rmApp.getLogAggregationStatusForAppReport()); + logAggregationStatus = rmApp.getLogAggregationReportsForApp(); + Assert.assertTrue(logAggregationStatus.size() == 1); + Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID())); + Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID())); } @Test (timeout = 10000) @@ -317,9 +365,11 @@ public void testGetLogAggregationStatusForAppReport() { // Enable the log aggregation conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); rmApp = (RMAppImpl)createRMApp(conf); - // If we do not know any NodeManagers for this application , - // the log aggregation status will return null - Assert.assertNull(rmApp.getLogAggregationStatusForAppReport()); + // If we do not know any NodeManagers for this application , and + // the log aggregation is enabled, the log aggregation status will + // return NOT_START + Assert.assertEquals(LogAggregationStatus.NOT_START, + rmApp.getLogAggregationStatusForAppReport()); NodeId nodeId1 = NodeId.newInstance("localhost", 1111); NodeId nodeId2 = NodeId.newInstance("localhost", 2222); @@ -329,24 +379,32 @@ public void testGetLogAggregationStatusForAppReport() { // If the log aggregation status for all NMs are NOT_START, // the log aggregation status for this app will return NOT_START rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); Assert.assertEquals(LogAggregationStatus.NOT_START, rmApp.getLogAggregationStatusForAppReport()); rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); Assert.assertEquals(LogAggregationStatus.RUNNING, rmApp.getLogAggregationStatusForAppReport()); @@ -357,13 +415,17 @@ public void testGetLogAggregationStatusForAppReport() { // others are SUCCEEDED, the log aggregation status for this app will // return TIME_OUT rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); @@ -371,17 +433,38 @@ public void testGetLogAggregationStatusForAppReport() { // is at the final state, the log aggregation status for this app will // return SUCCEEDED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST,EMPTY_LIST)); Assert.assertEquals(LogAggregationStatus.SUCCEEDED, rmApp.getLogAggregationStatusForAppReport()); rmApp = (RMAppImpl)createRMApp(conf); + // If the log aggregation status for at least one of NMs are RUNNING, + // the log aggregation status for this app will return RUNNING + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, + EMPTY_LIST, EMPTY_LIST)); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, + EMPTY_LIST, EMPTY_LIST)); + Assert.assertEquals(LogAggregationStatus.RUNNING, + rmApp.getLogAggregationStatusForAppReport()); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); // If at least of one log aggregation status for one NM is FAILED, @@ -389,13 +472,17 @@ public void testGetLogAggregationStatusForAppReport() { // at the final state, the log aggregation status for this app // will return FAILED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, + EMPTY_LIST, EMPTY_LIST)); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, + EMPTY_LIST, EMPTY_LIST)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport());