diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 729e043..b05c40d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -66,4 +68,6 @@ LocalDirsHandlerService getLocalDirsHandler(); ApplicationACLsManager getApplicationACLsManager(); + + ConcurrentLinkedQueue getAppLogState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a169c12..c69b19b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -242,6 +244,8 @@ public void run() { new ConcurrentHashMap(); private final ConcurrentMap containers = new ConcurrentSkipListMap(); + private final ConcurrentLinkedQueue appLogStatus = + new ConcurrentLinkedQueue(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -328,6 +332,11 @@ public LocalDirsHandlerService getLocalDirsHandler() { public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + + @Override + public ConcurrentLinkedQueue getAppLogState() { + return appLogStatus; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8a06418..21afc15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -374,7 +375,13 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { List keepAliveAppIds = createKeepAliveApplicationList(); nodeStatus.setKeepAliveApplications(keepAliveAppIds); - + + List appLogStatus = + new ArrayList(); + while(context.getAppLogState().peek() != null) { + appLogStatus.add(context.getAppLogState().poll()); + } + nodeStatus.setApplicationLogAggregationStatus(appLogStatus); return nodeStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 21d2f91..0a4fda8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -423,6 +423,12 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { SingleArcTransition { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { + if (event instanceof ApplicationLogHandlingFinishedEvent) { + ApplicationLogHandlingFinishedEvent finishedEvent = + (ApplicationLogHandlingFinishedEvent) event; + app.context.getAppLogState().add( + finishedEvent.getApplicationLogAggregationStatus()); + } ApplicationId appId = event.getApplicationID(); app.context.getApplications().remove(appId); app.aclsManager.removeApplication(appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationLogHandlingFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationLogHandlingFinishedEvent.java new file mode 100644 index 0000000..3e9c6fc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationLogHandlingFinishedEvent.java @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; + +public class ApplicationLogHandlingFinishedEvent extends ApplicationEvent { + + private final ApplicationLogAggregationStatus status; + + public ApplicationLogHandlingFinishedEvent(ApplicationId appID, + ApplicationLogAggregationStatus status) { + super(appID, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED); + this.status = status; + } + + public ApplicationLogAggregationStatus getApplicationLogAggregationStatus() { + return this.status; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6ef7944..1ac1e67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -36,15 +37,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogAggregationStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationLogHandlingFinishedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -71,7 +74,10 @@ private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final Map appAcls; - + private final Context context; + private final Map containersLogStatus; + private final StringBuilder diagnostic; + private boolean containerLogAggregationFail = false; private LogWriter writer = null; public AppLogAggregatorImpl(Dispatcher dispatcher, @@ -79,7 +85,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, - Map appAcls) { + Map appAcls, Context context) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -92,11 +98,15 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.context = context; + this.diagnostic = new StringBuilder(); + this.containersLogStatus = new HashMap(); } private void uploadLogsForContainer(ContainerId containerId) { if (this.logAggregationDisabled) { + containersLogStatus.put(containerId, LogAggregationState.DISABLED); return; } @@ -115,6 +125,12 @@ private void uploadLogsForContainer(ContainerId containerId) { LOG.error("Cannot create writer for app " + this.applicationId + ". Disabling log-aggregation for this app.", e); this.logAggregationDisabled = true; + containersLogStatus.put(containerId, LogAggregationState.DISABLED); + diagnostic.append("Can not create writer for this app " + + this.applicationId + + ". Disabled log-aggregation for this app at this Node " + + context.getNodeId() + "\n"); + containerLogAggregationFail = true; return; } } @@ -128,9 +144,14 @@ private void uploadLogsForContainer(ContainerId containerId) { userUgi.getShortUserName()); try { this.writer.append(logKey, logValue); + containersLogStatus.put(containerId, LogAggregationState.COMPLETED); } catch (IOException e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container."); + containersLogStatus.put(containerId, LogAggregationState.FAILED); + diagnostic.append("Couldn't upload logs for " + containerId + + "at this Node " + context.getNodeId() + "\n"); + containerLogAggregationFail = true; } } @@ -192,14 +213,22 @@ public Object run() throws Exception { } }); } catch (Exception e) { - LOG.error("Failed to move temporary log file to final location: [" + String errorMessage = "Failed to move temporary log file to final location: [" + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp - + "]", e); + + "]"; + LOG.error(errorMessage, e); + diagnostic.append(errorMessage + "at node " + context.getNodeId() + "\n"); + containerLogAggregationFail = true; + } + LogAggregationState appLogState = LogAggregationState.COMPLETED; + if(containerLogAggregationFail) { + appLogState = LogAggregationState.FAILED; } - + ApplicationLogAggregationStatus status = + ApplicationLogAggregationStatus.newInstance(appId, appLogState, + containersLogStatus, diagnostic.toString()); this.dispatcher.getEventHandler().handle( - new ApplicationEvent(this.appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + new ApplicationLogHandlingFinishedEvent(this.appId, status)); this.appAggregationFinished.set(true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index efe8984..806d5fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -326,7 +326,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls); + appAcls, context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); }