diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationReport.java
new file mode 100644
index 0000000..ee61715
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationReport.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code LogAggregationReport} is a report for log aggregation status
+ * in one NodeManager of an application.
+ *
+ * It includes details such as:
+ *
+ * {@link ApplicationId} of the application.
+ * {@link NodeId} of the NodeManager.
+ * {@link LogAggregationStatus}
+ * Diagnostic information
+ *
+ *
+ */
+@Public
+@Unstable
+public abstract class LogAggregationReport {
+
+ @Public
+ @Unstable
+ public static LogAggregationReport newInstance(ApplicationId appId,
+ NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+ LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(appId);
+ report.setLogAggregationStatus(status);
+ report.setDiagnosticMessage(diagnosticMessage);
+ return report;
+ }
+
+ /**
+ * Get the ApplicationId of the application.
+ * @return ApplicationId of the application
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationId(ApplicationId appId);
+
+ /**
+ * Get the NodeId.
+ * @return NodeId
+ */
+ @Public
+ @Unstable
+ public abstract NodeId getNodeId();
+
+ @Public
+ @Unstable
+ public abstract void setNodeId(NodeId nodeId);
+
+ /**
+ * Get the LogAggregationStatus.
+ * @return LogAggregationStatus
+ */
+ @Public
+ @Unstable
+ public abstract LogAggregationStatus getLogAggregationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setLogAggregationStatus(
+ LogAggregationStatus logAggregationStatus);
+
+ /**
+ * Get the diagnositic information of this log aggregation
+ * @return diagnositic information of this log aggregation
+ */
+ @Public
+ @Unstable
+ public abstract String getDiagnosticMessage();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticMessage(String diagnosticMessage);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
new file mode 100644
index 0000000..9b30d84
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+/**
+ * Status of Log aggregation.
+ */
+public enum LogAggregationStatus {
+ DISABLED,
+ NOT_START,
+ RUNNING,
+ FINISHED,
+ TIME_OUT
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 13e9a10..a9edf45 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -742,6 +742,16 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
/**
+ * How long for ResourceManager to wait for NodeManager to report its
+ * log aggregation status. If waiting time of which the log aggregation status
+ * is reported from NodeManager exceeds the configured value, RM will report
+ * log aggregation status for this NodeManager as TIME_OUT
+ */
+ public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
+ YARN_PREFIX + "log-aggregation-status.time-out.ms";
+ public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS = 10*60*1000;
+
+ /**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index b396f4d..f967b9e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -253,6 +253,21 @@ message NodeLabelProto {
optional bool isExclusive = 2 [default = true];
}
+enum LogAggregationStatusProto {
+ LOG_DISABLED = 1;
+ LOG_NOT_START = 2;
+ LOG_RUNNING = 3;
+ LOG_FINISHED = 4;
+ LOG_TIME_OUT = 5;
+}
+
+message LogAggregationReportProto {
+ optional ApplicationIdProto application_id = 1;
+ optional NodeIdProto node_id = 2;
+ optional LogAggregationStatusProto log_aggregation_status = 3;
+ optional string diagnostics = 4 [default = "N/A"];
+}
+
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationReportPBImpl.java
new file mode 100644
index 0000000..ad604dc
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationReportPBImpl.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationReportProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogAggregationReportPBImpl extends LogAggregationReport {
+ LogAggregationReportProto proto = LogAggregationReportProto
+ .getDefaultInstance();
+ LogAggregationReportProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId applicationId;
+ private NodeId nodeId;
+
+ public LogAggregationReportPBImpl() {
+ builder = LogAggregationReportProto.newBuilder();
+ }
+
+ public LogAggregationReportPBImpl(LogAggregationReportProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogAggregationReportProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.applicationId != null
+ && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+ builder.getApplicationId())) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+
+ if (this.nodeId != null
+ && !((NodeIdPBImpl) this.nodeId).getProto().equals(
+ builder.getNodeId())) {
+ builder.setNodeId(convertToProtoFormat(this.nodeId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogAggregationReportProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null)
+ builder.clearApplicationId();
+ this.applicationId = appId;
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(
+ ApplicationIdProto applicationId) {
+ return new ApplicationIdPBImpl(applicationId);
+ }
+
+ @Override
+ public LogAggregationStatus getLogAggregationStatus() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasLogAggregationStatus()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationStatus());
+ }
+
+ @Override
+ public void
+ setLogAggregationStatus(LogAggregationStatus logAggregationStatus) {
+ maybeInitBuilder();
+ if (logAggregationStatus == null) {
+ builder.clearLogAggregationStatus();
+ return;
+ }
+ builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus));
+ }
+
+ private LogAggregationStatus convertFromProtoFormat(
+ LogAggregationStatusProto s) {
+ return ProtoUtils.convertFromProtoFormat(s);
+ }
+
+ private LogAggregationStatusProto
+ convertToProtoFormat(LogAggregationStatus s) {
+ return ProtoUtils.convertToProtoFormat(s);
+ }
+
+ @Override
+ public String getDiagnosticMessage() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnosticMessage(String diagnosticMessage) {
+ maybeInitBuilder();
+ if (diagnosticMessage == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnosticMessage);
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ if (this.nodeId != null) {
+ return this.nodeId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeId()) {
+ return null;
+ }
+ this.nodeId = convertFromProtoFormat(p.getNodeId());
+ return this.nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ maybeInitBuilder();
+ if (nodeId == null)
+ builder.clearNodeId();
+ this.nodeId = nodeId;
+ }
+
+ private NodeIdProto convertToProtoFormat(NodeId t) {
+ return ((NodeIdPBImpl) t).getProto();
+ }
+
+ private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
+ return new NodeIdPBImpl(nodeId);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 586e9dd..a0f70df 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
@@ -253,4 +255,20 @@ public static ReservationRequestInterpreter convertFromProtoFormat(
return ReservationRequestInterpreter.valueOf(e.name());
}
+ /*
+ * LogAggregation Status
+ */
+ private static String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
+
+ public static LogAggregationStatusProto convertToProtoFormat(
+ LogAggregationStatus e) {
+ return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+ + e.name());
+ }
+
+ public static LogAggregationStatus convertFromProtoFormat(
+ LogAggregationStatusProto e) {
+ return LogAggregationStatus.valueOf(e.name().replace(
+ LOGAGGREGATION_STATUS_PREFIX, ""));
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index b80d9ce..873127c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@@ -51,4 +54,10 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
public abstract Set getNodeLabels();
public abstract void setNodeLabels(Set nodeLabels);
+
+ public abstract Map
+ getLogAggregationReportsForApps();
+
+ public abstract void setLogAggregationReportsForApps(
+ Map logAggregationReportsForApps);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 16d47f9..a570e3d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,13 +18,23 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationReportPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -42,6 +52,8 @@
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set labels = null;
+ private Map
+ logAggregationReportsForApps = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -91,6 +103,25 @@ private void mergeLocalToBuilder() {
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
+ if (this.logAggregationReportsForApps != null) {
+ addLogAggregationStatusForAppsToProto();
+ }
+ }
+
+ private void addLogAggregationStatusForAppsToProto() {
+ maybeInitBuilder();
+ builder.clearLogAggregationReportsForApps();
+ for (Entry entry : logAggregationReportsForApps
+ .entrySet()) {
+ builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
+ .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
+ .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+ }
+ }
+
+ private LogAggregationReportProto convertToProtoFormat(
+ LogAggregationReport value) {
+ return ((LogAggregationReportPBImpl) value).getProto();
}
private void mergeLocalToProto() {
@@ -215,4 +246,54 @@ private void initNodeLabels() {
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet(nodeLabels.getElementsList());
}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ @Override
+ public Map
+ getLogAggregationReportsForApps() {
+ if (this.logAggregationReportsForApps != null) {
+ return this.logAggregationReportsForApps;
+ }
+ initLogAggregationReportsForApps();
+ return logAggregationReportsForApps;
+ }
+
+ private void initLogAggregationReportsForApps() {
+ NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List list =
+ p.getLogAggregationReportsForAppsList();
+ this.logAggregationReportsForApps =
+ new HashMap();
+ for (LogAggregationReportsForAppsProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ LogAggregationReport report =
+ convertFromProtoFormat(c.getLogAggregationReport());
+ this.logAggregationReportsForApps.put(appId, report);
+ }
+ }
+
+ private LogAggregationReport convertFromProtoFormat(
+ LogAggregationReportProto logAggregationReport) {
+ return new LogAggregationReportPBImpl(logAggregationReport);
+ }
+
+ @Override
+ public void setLogAggregationReportsForApps(
+ Map logAggregationStatusForApps) {
+ if (logAggregationStatusForApps == null
+ || logAggregationStatusForApps.isEmpty()) {
+ return;
+ }
+ maybeInitBuilder();
+ this.logAggregationReportsForApps =
+ new HashMap();
+ this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
index ae4737d..d5a3dd8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
@@ -21,8 +21,10 @@
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.WEB_UI_TYPE;
+
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
+
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -154,7 +157,7 @@ public ApplicationReport run() throws Exception {
html.script().$type("text/javascript")._(script.toString())._();
}
- info("Application Overview")
+ ResponseInfo overviewTable = info("Application Overview")
._("User:", app.getUser())
._("Name:", app.getName())
._("Application Type:", app.getType())
@@ -181,8 +184,13 @@ public ApplicationReport run() throws Exception {
.getAppState() == YarnApplicationState.FINISHED
|| app.getAppState() == YarnApplicationState.FAILED
|| app.getAppState() == YarnApplicationState.KILLED ? "History"
- : "ApplicationMaster")
- ._("Diagnostics:",
+ : "ApplicationMaster");
+ if (webUiType != null
+ && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
+ overviewTable._("Log Aggregation Status",
+ root_url("logaggregationstatus", app.getAppId()), "Status");
+ }
+ overviewTable._("Diagnostics:",
app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
Collection attempts;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d8c92c4..970123c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -50,6 +50,12 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
+ repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
+}
+
+message LogAggregationReportsForAppsProto {
+ optional ApplicationIdProto appId = 1;
+ optional LogAggregationReportProto log_aggregation_report = 2;
}
message NodeHeartbeatResponseProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 6e7e2ec..6f56296 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -77,4 +79,7 @@
boolean getDecommissioned();
void setDecommissioned(boolean isDecommissioned);
+
+ ConcurrentLinkedQueue
+ getLogAggregationStatusForApps();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index f95a7ad..5daea22 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -382,6 +384,8 @@ public void run() {
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
private final NMStateStoreService stateStore;
private boolean isDecommissioned = false;
+ private final ConcurrentLinkedQueue
+ logAggregationReportForApps;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
@@ -395,6 +399,8 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
this.stateStore = stateStore;
+ this.logAggregationReportForApps = new ConcurrentLinkedQueue<
+ LogAggregationReport>();
}
/**
@@ -486,6 +492,12 @@ public void setSystemCrendentialsForApps(
Map systemCredentials) {
this.systemCredentials = systemCredentials;
}
+
+ @Override
+ public ConcurrentLinkedQueue
+ getLogAggregationStatusForApps() {
+ return this.logAggregationReportForApps;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 2549e0f..d465f36 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Random;
import java.util.Set;
@@ -48,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -73,6 +75,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@@ -115,6 +118,8 @@
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
+ private boolean logAggregationEnabled;
+
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@@ -193,6 +198,10 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
+
+ this.logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
}
@Override
@@ -649,6 +658,18 @@ public void run() {
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
+
+ if (logAggregationEnabled) {
+ // pull log aggregation status for application running in this NM
+ Map logAggregationReports =
+ getLogAggregationReportsForApps(context
+ .getLogAggregationStatusForApps());
+ if (logAggregationReports != null
+ && !logAggregationReports.isEmpty()) {
+ request.setLogAggregationReportsForApps(logAggregationReports);
+ }
+ }
+
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -782,6 +803,38 @@ private void updateMasterKeys(NodeHeartbeatResponse response) {
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
-
-
+
+ private Map
+ getLogAggregationReportsForApps(ConcurrentLinkedQueue<
+ LogAggregationReport> lastestLogAggregationStatus) {
+ Map latestLogAggregationReports =
+ new HashMap();
+ LogAggregationReport status;
+ while ((status = lastestLogAggregationStatus.poll()) != null) {
+ LogAggregationReport report = null;
+ if (!latestLogAggregationReports.containsKey(status.getApplicationId())) {
+ report = Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(status.getApplicationId());
+ report.setNodeId(this.nodeId);
+ report.setLogAggregationStatus(status.getLogAggregationStatus());
+ report.setDiagnosticMessage(status.getDiagnosticMessage());
+ } else {
+ report = latestLogAggregationReports.get(status.getApplicationId());
+ report.setLogAggregationStatus(status.getLogAggregationStatus());
+ String message = report.getDiagnosticMessage();
+ if (status.getDiagnosticMessage() != null &&
+ !status.getDiagnosticMessage().isEmpty()) {
+ if (message != null) {
+ message += status.getDiagnosticMessage();
+ } else {
+ message = status.getDiagnosticMessage();
+ }
+ report.setDiagnosticMessage(message);
+ }
+ }
+ latestLogAggregationReports.put(
+ status.getApplicationId(), report);
+ }
+ return latestLogAggregationReports;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 393576b..d5c24d4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -49,6 +49,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -63,6 +65,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@@ -292,12 +296,14 @@ private void uploadLogsForContainers(boolean appFinished) {
writer.close();
}
+ long currentTime = System.currentTimeMillis();
final Path renamedPath = this.rollingMonitorInterval <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
- + System.currentTimeMillis());
+ + currentTime);
+ String diagnosticMessage = "";
final boolean rename = uploadedLogsInThisCycle;
try {
userUgi.doAs(new PrivilegedExceptionAction() {
@@ -314,12 +320,34 @@ public Object run() throws Exception {
return null;
}
});
+ diagnosticMessage =
+ "Log uploaded successfully for Application: " + appId
+ + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(nodeId) + " at "
+ + Times.format(currentTime) + "\n";
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to ["
+ renamedPath + "]", e);
+ diagnosticMessage =
+ "Log uploaded failed for Application: " + appId
+ + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(nodeId) + " at "
+ + Times.format(currentTime) + "\n";
}
+
+ LogAggregationReport report =
+ Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(appId);
+ report.setNodeId(nodeId);
+ report.setDiagnosticMessage(diagnosticMessage);
+ if (appFinished) {
+ report.setLogAggregationStatus( LogAggregationStatus.FINISHED);
+ } else {
+ report.setLogAggregationStatus( LogAggregationStatus.RUNNING);
+ }
+ this.context.getLogAggregationStatusForApps().add(report);
} finally {
if (writer != null) {
writer.close();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 22efe25..5e2dc7e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -458,10 +458,16 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
// 4. Send status to RMNode, saving the latest response.
- this.rmContext.getDispatcher().getEventHandler().handle(
+ RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
- remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ remoteNodeStatus.getContainersStatuses(),
+ remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+ if (request.getLogAggregationReportsForApps() != null
+ && !request.getLogAggregationReportsForApps().isEmpty()) {
+ nodeStatusEvent.setLogAggregationReportsForApps(request
+ .getLogAggregationReportsForApps());
+ }
+ this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
// 5. Update node's labels to RM's NodeLabelManager.
if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..697ff18 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -242,4 +243,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
ReservationId getReservationId();
ResourceRequest getAMResourceRequest();
+
+ Map getLogAggregationReportsForApp();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..b3254d2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -25,9 +25,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,6 +50,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -142,6 +146,12 @@
new AppFinishedTransition();
private Set ranNodes = new ConcurrentSkipListSet();
+ private final boolean logAggregationEnabled;
+ private long logAggregationStartTime = 0;
+ private final long logAggregationStatusTimeout;
+ private final Map logAggregationStatus =
+ new HashMap();
+
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
@@ -413,6 +423,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
+
+ long localLogAggregationStatusTimeout =
+ conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+ if (localLogAggregationStatusTimeout <= 0) {
+ this.logAggregationStatusTimeout =
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+ } else {
+ this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
+ }
+ this.logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
}
@Override
@@ -803,6 +826,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
+
+ app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
+ LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
+ .getNodeId(), app.logAggregationEnabled
+ ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
+ ""));
};
}
@@ -1153,6 +1182,7 @@ public FinalTransition(RMAppState finalState) {
}
public void transition(RMAppImpl app, RMAppEvent event) {
+ app.logAggregationStartTime = System.currentTimeMillis();
for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1356,4 +1386,62 @@ protected Credentials parseCredentials() throws IOException {
}
return credentials;
}
+
+ @Override
+ public Map getLogAggregationReportsForApp() {
+ try {
+ this.readLock.lock();
+ Map outputs =
+ new HashMap();
+ outputs.putAll(logAggregationStatus);
+ for (Entry output : outputs.entrySet()) {
+ if (!output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.TIME_OUT)
+ && !output.getValue().getLogAggregationStatus()
+ .equals(LogAggregationStatus.FINISHED)
+ && isAppInFinalState(this)
+ && System.currentTimeMillis() > this.logAggregationStartTime
+ + this.logAggregationStatusTimeout) {
+ output.getValue().setLogAggregationStatus(
+ LogAggregationStatus.TIME_OUT);
+ }
+ }
+ return outputs;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
+ try {
+ this.writeLock.lock();
+ if (this.logAggregationEnabled) {
+ LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+ if (curReport == null) {
+ this.logAggregationStatus.put(nodeId, report);
+ } else {
+ if (curReport.getLogAggregationStatus().equals(
+ LogAggregationStatus.TIME_OUT)) {
+ if (report.getLogAggregationStatus().equals(
+ LogAggregationStatus.FINISHED)) {
+ curReport.setLogAggregationStatus(report
+ .getLogAggregationStatus());
+ }
+ } else {
+ curReport.setLogAggregationStatus(report.getLogAggregationStatus());
+ }
+
+ if (report.getDiagnosticMessage() != null
+ && !report.getDiagnosticMessage().isEmpty()) {
+ curReport
+ .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
+ ? report.getDiagnosticMessage() : curReport
+ .getDiagnosticMessage() + report.getDiagnosticMessage());
+ }
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index c556b80..db5233c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -22,6 +22,8 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -39,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -243,7 +247,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.stateMachine = stateMachineFactory.make(this);
- this.nodeUpdateQueue = new ConcurrentLinkedQueue();
+ this.nodeUpdateQueue = new ConcurrentLinkedQueue();
}
@Override
@@ -773,6 +777,13 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.handleContainerStatus(statusEvent.getContainers());
+ Map logAggregationReportsForApps =
+ statusEvent.getLogAggregationReportsForApps();
+ if (logAggregationReportsForApps != null
+ && !logAggregationReportsForApps.isEmpty()) {
+ rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
+ }
+
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
@@ -903,4 +914,15 @@ private void handleContainerStatus(List containerStatuses) {
}
}
+ private void handleLogAggregationStatus(
+ Map logAggregationReportsForApps) {
+ for (Entry report :
+ logAggregationReportsForApps.entrySet()) {
+ RMApp rmApp = this.context.getRMApps().get(report.getKey());
+ if (rmApp != null) {
+ ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+ }
+ }
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index abfacbb..17fddbe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -32,6 +34,7 @@
private final List containersCollection;
private final NodeHeartbeatResponse latestResponse;
private final List keepAliveAppIds;
+ private Map logAggregationReportsForApps;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List collection, List keepAliveAppIds,
@@ -41,6 +44,19 @@ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
+ this.logAggregationReportsForApps = null;
+ }
+
+ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+ List collection, List keepAliveAppIds,
+ NodeHeartbeatResponse latestResponse,
+ Map logAggregationReportsForApps) {
+ super(nodeId, RMNodeEventType.STATUS_UPDATE);
+ this.nodeHealthStatus = nodeHealthStatus;
+ this.containersCollection = collection;
+ this.keepAliveAppIds = keepAliveAppIds;
+ this.latestResponse = latestResponse;
+ this.logAggregationReportsForApps = logAggregationReportsForApps;
}
public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +74,14 @@ public NodeHeartbeatResponse getLatestResponse() {
public List getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
+
+ public Map
+ getLogAggregationReportsForApps() {
+ return this.logAggregationReportsForApps;
+ }
+
+ public void setLogAggregationReportsForApps(
+ Map logAggregationReportsForApps) {
+ this.logAggregationReportsForApps = logAggregationReportsForApps;
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
new file mode 100644
index 0000000..78d6557
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+
+public class AppLogAggregationStatusPage extends RmView{
+
+ @Override
+ protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ String appId = $(YarnWebParams.APPLICATION_ID);
+ set(
+ TITLE,
+ appId.isEmpty() ? "Bad request: missing application ID" : join(
+ "Application ", $(YarnWebParams.APPLICATION_ID)));
+ }
+
+ @Override
+ protected Class extends SubView> content() {
+ return RMAppLogAggregationStatusBlock.class;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
new file mode 100644
index 0000000..4000573
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class RMAppLogAggregationStatusBlock extends HtmlBlock {
+
+ private static final Log LOG = LogFactory
+ .getLog(RMAppLogAggregationStatusBlock.class);
+ private final ResourceManager rm;
+ private final Configuration conf;
+
+ @Inject
+ RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm,
+ Configuration conf) {
+ super(ctx);
+ this.rm = rm;
+ this.conf = conf;
+ }
+
+ @Override
+ protected void render(Block html) {
+ String aid = $(APPLICATION_ID);
+ if (aid.isEmpty()) {
+ puts("Bad request: requires Application ID");
+ return;
+ }
+
+ ApplicationId appId;
+ try {
+ appId = Apps.toAppID(aid);
+ } catch (Exception e) {
+ puts("Invalid Application ID: " + aid);
+ return;
+ }
+
+ setTitle(join("Application ", aid));
+
+ boolean logAggregationEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ // Application Log aggregation status Table
+ DIV div = html.div(_INFO_WRAP);
+ TABLE> table =
+ div.h3(
+ "Log Aggregation: "
+ + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+ "#LogAggregationStatus");
+ table.
+ tr().
+ th(_TH, "NodeId").
+ th(_TH, "Log Aggregation Status").
+ th(_TH, "Diagnostis Message").
+ _();
+
+ RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+ if (rmApp != null) {
+ Map
logAggregationReports =
+ rmApp.getLogAggregationReportsForApp();
+ if (logAggregationReports != null && !logAggregationReports.isEmpty()) {
+ for (Entry report :
+ logAggregationReports.entrySet()) {
+ LogAggregationStatus status =
+ report.getValue() == null ? null : report.getValue()
+ .getLogAggregationStatus();
+ String message =
+ report.getValue() == null ? null : report.getValue()
+ .getDiagnosticMessage();
+ table.tr()
+ .td(report.getKey().toString())
+ .td(status == null ? "N/A" : status.toString())
+ .td(message == null ? "N/A" : message)._();
+ }
+ }
+ }
+ table._();
+ div._();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
index 86300ce..a86ed4f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
@@ -68,6 +68,8 @@ public void setup() {
"appattempt");
route(pajoin("/container", CONTAINER_ID), RmController.class, "container");
route("/errors-and-warnings", RmController.class, "errorsAndWarnings");
+ route(pajoin("/logaggregationstatus", APPLICATION_ID),
+ RmController.class, "logaggregationstatus");
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
index c8e3c5b..b124d75 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
@@ -109,4 +109,8 @@ public void nodelabels() {
public void errorsAndWarnings() {
render(RMErrorsAndWarningsPage.class);
}
+
+ public void logaggregationstatus() {
+ render(AppLogAggregationStatusPage.class);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..c83ad1d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -190,6 +191,11 @@ public ReservationId getReservationId() {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Map getLogAggregationReportsForApp() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
new file mode 100644
index 0000000..a2bc3f2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMAppLogAggregationStatus {
+
+ private RMContext rmContext;
+ private YarnScheduler scheduler;
+
+ private SchedulerEventType eventType;
+
+ private ApplicationId appId;
+
+
+ private final class TestSchedulerEventDispatcher implements
+ EventHandler {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+ rmContext =
+ new RMContextImpl(rmDispatcher, null, null, null,
+ null, null, null, null, null,
+ new RMApplicationHistoryWriter());
+ rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+
+ scheduler = mock(YarnScheduler.class);
+ doAnswer(
+ new Answer() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+ eventType = event.getType();
+ if (eventType == SchedulerEventType.NODE_UPDATE) {
+ //DO NOTHING
+ }
+ return null;
+ }
+ }
+ ).when(scheduler).handle(any(SchedulerEvent.class));
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+ appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testLogAggregationStatus() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, 1500);
+ RMApp rmApp = createRMApp(conf);
+ this.rmContext.getRMApps().put(appId, rmApp);
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.START));
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_NEW_SAVED));
+ rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_ACCEPTED));
+
+ // This application will be running on two nodes
+ NodeId nodeId1 = NodeId.newInstance("localhost", 1234);
+ Resource capability = Resource.newInstance(4096, 4);
+ RMNodeImpl node1 =
+ new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
+ node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+ rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
+
+ NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
+ RMNodeImpl node2 =
+ new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
+ node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+ rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
+
+ // The initial log aggregation status for these two nodes
+ // should be NOT_STARTED
+ Map logAggregationStatus =
+ rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry report : logAggregationStatus
+ .entrySet()) {
+ Assert.assertEquals(LogAggregationStatus.NOT_START, report.getValue()
+ .getLogAggregationStatus());
+ }
+
+ Map node1ReportForApp =
+ new HashMap();
+ String messageForNode1_1 =
+ "node1 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report1 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.RUNNING, messageForNode1_1);
+ node1ReportForApp.put(appId, report1);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node1ReportForApp));
+
+ Map node2ReportForApp =
+ new HashMap();
+ String messageForNode2_1 =
+ "node2 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report2 =
+ LogAggregationReport.newInstance(appId, nodeId2,
+ LogAggregationStatus.RUNNING, messageForNode2_1);
+ node2ReportForApp.put(appId, report2);
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node2ReportForApp));
+ // node1 and node2 has updated its log aggregation status
+ // verify that the log aggregation status for node1, node2
+ // has been changed
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1, report.getValue()
+ .getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode2_1, report.getValue()
+ .getDiagnosticMessage());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+
+ // node1 updates its log aggregation status again
+ Map node1ReportForApp2 =
+ new HashMap();
+ String messageForNode1_2 =
+ "node1 logAggregation status updated at " + System.currentTimeMillis();
+ LogAggregationReport report1_2 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.RUNNING, messageForNode1_2);
+ node1ReportForApp2.put(appId, report1_2);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node1ReportForApp2));
+
+ // verify that the log aggregation status for node1
+ // has been changed
+ // verify that the log aggregation status for node2
+ // does not change
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
+ .getValue().getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode2_1, report.getValue()
+ .getDiagnosticMessage());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+
+ // kill the application
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.KILL));
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.ATTEMPT_KILLED));
+ rmApp.handle(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED));
+ Assert.assertEquals(RMAppState.KILLED, rmApp.getState());
+
+ // wait for 1500 ms
+ Thread.sleep(1500);
+
+ // the log aggregation status for both nodes should be changed
+ // to TIME_OUT
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry report : logAggregationStatus
+ .entrySet()) {
+ Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+ .getLogAggregationStatus());
+ }
+
+ // Finally, node1 finished its log aggregation and sent out its final
+ // log aggregation status. The log aggregation status for node1 should
+ // be changed from TIME_OUT to Finished
+ Map node1ReportForApp3 =
+ new HashMap();
+ String messageForNode1_3 =
+ "node1 final logAggregation status updated at "
+ + System.currentTimeMillis();
+ LogAggregationReport report1_3 =
+ LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationStatus.FINISHED, messageForNode1_3);
+ node1ReportForApp3.put(appId, report1_3);
+ node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node1ReportForApp3));
+
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertEquals(2, logAggregationStatus.size());
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+ Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+ for (Entry report : logAggregationStatus
+ .entrySet()) {
+ if (report.getKey().equals(node1.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
+ .getLogAggregationStatus());
+ Assert.assertEquals(messageForNode1_1 + messageForNode1_2
+ + messageForNode1_3, report.getValue().getDiagnosticMessage());
+ } else if (report.getKey().equals(node2.getNodeID())) {
+ Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+ .getLogAggregationStatus());
+ } else {
+ // should not contain log aggregation report for other nodes
+ Assert
+ .fail("should not contain log aggregation report for other nodes");
+ }
+ }
+ }
+
+ private RMApp createRMApp(Configuration conf) {
+ ApplicationSubmissionContext submissionContext =
+ ApplicationSubmissionContext.newInstance(appId, "test", "default",
+ Priority.newInstance(0), null, false, true,
+ 2, Resource.newInstance(10, 2), "test");
+ return new RMAppImpl(this.appId, this.rmContext,
+ conf, "test", "test", "default", submissionContext,
+ this.rmContext.getScheduler(),
+ this.rmContext.getApplicationMasterService(),
+ System.currentTimeMillis(), "test",
+ null, null);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..a6efd17 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -271,4 +272,9 @@ public ReservationId getReservationId() {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Map getLogAggregationReportsForApp() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}