diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 729e043..9537a48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -66,4 +68,6 @@ LocalDirsHandlerService getLocalDirsHandler(); ApplicationACLsManager getApplicationACLsManager(); + + ConcurrentLinkedQueue getAppLogState(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a169c12..9282b1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -242,6 +244,8 @@ public void run() { new ConcurrentHashMap(); private final ConcurrentMap containers = new ConcurrentSkipListMap(); + private final ConcurrentLinkedQueue appLogStatus = + new ConcurrentLinkedQueue(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -328,6 +332,11 @@ public LocalDirsHandlerService getLocalDirsHandler() { public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + + @Override + public ConcurrentLinkedQueue getAppLogState() { + return appLogStatus; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8a06418..c55b925 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -374,7 +375,13 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { List keepAliveAppIds = createKeepAliveApplicationList(); nodeStatus.setKeepAliveApplications(keepAliveAppIds); - + + List appLogStatus = + new ArrayList(); + while(context.getAppLogState().peek() != null) { + appLogStatus.add(context.getAppLogState().poll()); + } + nodeStatus.setApplicationLogStatus(appLogStatus); return nodeStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6ef7944..b4d115d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -36,11 +37,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.server.api.records.ApplicationLogStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -71,7 +75,10 @@ private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final Map appAcls; - + private final Context context; + private final Map containersLogStatus; + private final StringBuilder diagnostic; + private final AtomicBoolean containerLogAggregationFail; private LogWriter writer = null; public AppLogAggregatorImpl(Dispatcher dispatcher, @@ -79,7 +86,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, - Map appAcls) { + Map appAcls, Context context) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -92,11 +99,16 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.context = context; + this.diagnostic = new StringBuilder(); + this.containersLogStatus = new HashMap(); + this.containerLogAggregationFail = new AtomicBoolean(false); } private void uploadLogsForContainer(ContainerId containerId) { if (this.logAggregationDisabled) { + containersLogStatus.put(containerId, LogAggregationState.DISABLED); return; } @@ -115,6 +127,12 @@ private void uploadLogsForContainer(ContainerId containerId) { LOG.error("Cannot create writer for app " + this.applicationId + ". Disabling log-aggregation for this app.", e); this.logAggregationDisabled = true; + containersLogStatus.put(containerId, LogAggregationState.DISABLED); + diagnostic.append("Can not create writer for this app " + + this.applicationId + + ". Disabled log-aggregation for this app at this Node " + + context.getNodeId() + "\n"); + containerLogAggregationFail.getAndSet(true); return; } } @@ -128,9 +146,14 @@ private void uploadLogsForContainer(ContainerId containerId) { userUgi.getShortUserName()); try { this.writer.append(logKey, logValue); + containersLogStatus.put(containerId, LogAggregationState.COMPLETED); } catch (IOException e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container."); + containersLogStatus.put(containerId, LogAggregationState.FAILED); + diagnostic.append("Couldn't upload logs for " + containerId + + "at this Node " + context.getNodeId() + "\n"); + containerLogAggregationFail.getAndSet(true); } } @@ -192,11 +215,20 @@ public Object run() throws Exception { } }); } catch (Exception e) { - LOG.error("Failed to move temporary log file to final location: [" + String errorMessage = "Failed to move temporary log file to final location: [" + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp - + "]", e); + + "]"; + LOG.error(errorMessage, e); + diagnostic.append(errorMessage + "at node " + context.getNodeId() + "\n"); + containerLogAggregationFail.getAndSet(true); + } + LogAggregationState appLogState = LogAggregationState.COMPLETED; + if(containerLogAggregationFail.get()) { + appLogState = LogAggregationState.FAILED; } - + this.context.getAppLogState().add( + ApplicationLogStatus.newInstance(this.appId, appLogState, + containersLogStatus, diagnostic.toString())); this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index efe8984..806d5fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -326,7 +326,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls); + appAcls, context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); }