diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index fc30a80..ff50330 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -112,4 +112,9 @@ public abstract void setRunningApplications( * @param physicalResource Physical resources in the node. */ public abstract void setPhysicalResource(Resource physicalResource); + + public abstract List getLogAggregationReportsForApps(); + + public abstract void setLogAggregationReportsForApps( + List logAggregationReportsForApps); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index eda06d0..f1d7339 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -57,6 +59,8 @@ private List runningApplications = null; private Set labels = null; + private List logAggregationReportsForApps = null; + /** Physical resources in the node. */ private Resource physicalResource = null; @@ -100,6 +104,48 @@ private synchronized void mergeLocalToBuilder() { if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.logAggregationReportsForApps != null) { + addLogAggregationStatusForAppsToProto(); + } + } + + private void addLogAggregationStatusForAppsToProto() { + maybeInitBuilder(); + builder.clearLogAggregationReportsForApps(); + if (this.logAggregationReportsForApps == null) { + return; + } + Iterable it = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = + logAggregationReportsForApps.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public LogAggregationReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllLogAggregationReportsForApps(it); + } + + private LogAggregationReportProto convertToProtoFormat( + LogAggregationReport value) { + return ((LogAggregationReportPBImpl) value).getProto(); } private synchronized void addNMContainerStatusesToProto() { @@ -400,4 +446,37 @@ private static NMContainerStatusProto convertToProtoFormat( NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + + @Override + public List getLogAggregationReportsForApps() { + if (this.logAggregationReportsForApps != null) { + return this.logAggregationReportsForApps; + } + initLogAggregationReportsForApps(); + return logAggregationReportsForApps; + } + + private void initLogAggregationReportsForApps() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getLogAggregationReportsForAppsList(); + this.logAggregationReportsForApps = new ArrayList(); + for (LogAggregationReportProto c : list) { + this.logAggregationReportsForApps.add(convertFromProtoFormat(c)); + } + } + + private LogAggregationReport convertFromProtoFormat( + LogAggregationReportProto logAggregationReport) { + return new LogAggregationReportPBImpl(logAggregationReport); + } + + @Override + public void setLogAggregationReportsForApps( + List logAggregationStatusForApps) { + if(logAggregationStatusForApps == null) { + builder.clearLogAggregationReportsForApps(); + } + this.logAggregationReportsForApps = logAggregationStatusForApps; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 8e59f14..e9598af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto { repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; + repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; } message RegisterNodeManagerResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 33cefea..31fd6dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -122,4 +122,6 @@ void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher); NMTimelinePublisher getNMTimelinePublisher(); + + NMLogAggregationStatusTracker getNMLogAggregationStatusTracker(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 431a894..5f31d09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.state.MultiStateTransitionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -596,6 +597,8 @@ protected void reregisterCollectors() { private NMTimelinePublisher nmTimelinePublisher; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -796,6 +799,15 @@ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { public NMTimelinePublisher getNMTimelinePublisher() { return nmTimelinePublisher; } + + public void setNMLogAggregationStatusTracker( + NMLogAggregationStatusTracker nmLogAggregationStatusTracker) { + this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker; + } + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return nmLogAggregationStatusTracker; + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index de3db6e..1c87843 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -138,6 +138,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; @@ -226,6 +227,8 @@ // NM metrics publisher is set only if the timeline service v.2 is enabled private NMTimelinePublisher nmMetricsPublisher; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -283,6 +286,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, addService(dispatcher); + this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker( + context); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -558,6 +563,11 @@ protected NMTimelinePublisher createNMTimelinePublisher(Context ctxt) { return nmTimelinePublisherLocal; } + protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker( + Context ctxt) { + return new NMLogAggregationStatusTracker(ctxt); + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); @@ -653,6 +663,7 @@ protected void serviceStart() throws Exception { } } + this.nmLogAggregationStatusTracker.start(); LOG.info("ContainerManager started at " + connectAddress); LOG.info("ContainerManager bound to " + initialAddress); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 4ac150a..c7e06ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -385,7 +385,8 @@ private void sendLogAggregationReport( logAggregationSucceedInThisCycle ? LogAggregationStatus.RUNNING : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage, + false); if (appFinished) { // If the app is finished, one extra final report with log aggregation // status SUCCEEDED/FAILED will be sent to RM to inform the RM @@ -394,18 +395,22 @@ private void sendLogAggregationReport( renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + sendLogAggregationReportInternal(finalLogAggregationStatus, "", true); } } private void sendLogAggregationReportInternal( - LogAggregationStatus logAggregationStatus, String diagnosticMessage) { + LogAggregationStatus logAggregationStatus, String diagnosticMessage, + boolean finalized) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setDiagnosticMessage(diagnosticMessage); report.setLogAggregationStatus(logAggregationStatus); this.context.getLogAggregationStatusForApps().add(report); + this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus( + appId, logAggregationStatus, System.currentTimeMillis(), + diagnosticMessage, finalized); } @SuppressWarnings("unchecked") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java new file mode 100644 index 0000000..8059013 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NMLogAggregationStatusTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(NMLogAggregationStatusTracker.class); + + private final ReadLock updateLocker; + private final WriteLock pullLocker; + private final Context nmContext; + private final Configuration conf; + private final long rollingInterval = 5 * 60 * 1000; + private final Timer timer; + private final Map trackers; + + public NMLogAggregationStatusTracker(Context context) { + this.nmContext = context; + this.conf = context.getConf(); + this.trackers = new HashMap<>(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.updateLocker = lock.readLock(); + this.pullLocker = lock.writeLock(); + this.timer = new Timer(); + } + + public void start() { + this.timer.scheduleAtFixedRate(new logAggregationStatusRoller(), + rollingInterval, rollingInterval); + } + + public void updateLogAggregationStatus(ApplicationId appId, + LogAggregationStatus logAggregationStatus, long updateTime, + String diagnosis, boolean finalized) { + this.updateLocker.lock(); + try { + LogAggregationTrakcer tracker = trackers.get(appId); + if (tracker == null) { + Application application = this.nmContext.getApplications().get(appId); + if (application == null) { + // the application has already finished or + // this application is unknown application. + // Check the log aggregation status update time, if the update time is + // still in the period of timeout, we add it to the trackers map. + // Otherwise, we ignore it. + long currentTime = System.currentTimeMillis(); + if (updateTime - currentTime > rollingInterval) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The log aggregation status" + + " update time is " + updateTime + " while the request process " + + "time is " + currentTime + "."); + return; + } + } + LogAggregationTrakcer newTracker = new LogAggregationTrakcer( + logAggregationStatus, diagnosis); + newTracker.setLastModifiedTime(updateTime); + newTracker.setFinalized(finalized); + trackers.put(appId, newTracker); + } else { + if (tracker.isFinalized()) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The cached log aggregation " + + "status is " + tracker.getLogAggregationStatus() + "."); + } else { + if (tracker.getLastModifiedTime() > updateTime) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The request log " + + "aggregation status update is older than the cached " + + "log aggregation status."); + } else { + tracker.setLogAggregationStatus(logAggregationStatus); + tracker.setDiagnosis(diagnosis); + tracker.setLastModifiedTime(updateTime); + tracker.setFinalized(finalized); + trackers.put(appId, tracker); + } + } + } + } finally { + this.updateLocker.unlock(); + } + } + + public List pullCachedLogAggregationReports() { + this.pullLocker.lock(); + try { + List reports = new ArrayList<>(); + for(Entry tracker : + trackers.entrySet()) { + LogAggregationTrakcer current = tracker.getValue(); + LogAggregationReport report = LogAggregationReport.newInstance( + tracker.getKey(), current.getLogAggregationStatus(), + current.getDiagnosis()); + reports.add(report); + } + return reports; + } finally { + this.pullLocker.unlock(); + } + } + + private class logAggregationStatusRoller extends TimerTask { + @Override + public void run() { + rollLogAggregationStatus(); + } + } + + @Private + void rollLogAggregationStatus() { + this.pullLocker.lock(); + try { + long currentTimeStamp = System.currentTimeMillis(); + LOG.info("Rolling over the cached log aggregation status."); + Iterator> it = trackers + .entrySet().iterator(); + while (it.hasNext()) { + Entry tracker = it.next(); + // the application has finished. + if (nmContext.getApplications().get(tracker.getKey()) == null) { + if (currentTimeStamp - tracker.getValue().getLastModifiedTime() + > rollingInterval) { + it.remove(); + } + } + } + } finally { + this.pullLocker.unlock(); + } + } + + private static class LogAggregationTrakcer { + private LogAggregationStatus logAggregationStatus; + private long lastModifiedTime; + private boolean finalized; + private String diagnosis; + + public LogAggregationTrakcer( + LogAggregationStatus logAggregationStatus, String diagnosis) { + this.setLogAggregationStatus(logAggregationStatus); + this.setDiagnosis(diagnosis); + } + + public LogAggregationStatus getLogAggregationStatus() { + return logAggregationStatus; + } + + public void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus) { + this.logAggregationStatus = logAggregationStatus; + } + + public long getLastModifiedTime() { + return lastModifiedTime; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public boolean isFinalized() { + return finalized; + } + + public void setFinalized(boolean finalized) { + this.finalized = finalized; + } + + public String getDiagnosis() { + return diagnosis; + } + + public void setDiagnosis(String diagnosis) { + this.diagnosis = diagnosis; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 56d48ef..07e5800 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; @@ -798,5 +799,10 @@ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { public NMTimelinePublisher getNMTimelinePublisher() { return null; } + + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return null; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index de8386d..641ebb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -423,8 +423,15 @@ public RegisterNodeManagerResponse registerNodeManager( .handle(new NodeRemovedSchedulerEvent(rmNode)); this.rmContext.getRMNodes().put(nodeId, rmNode); + RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, null, + null); + if (request.getLogAggregationReportsForApps() != null + && !request.getLogAggregationReportsForApps().isEmpty()) { + startEvent.setLogAggregationReportsForApps(request + .getLogAggregationReportsForApps()); + } this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeStartedEvent(nodeId, null, null)); + .handle(startEvent); } else { // Reset heartbeat ID since node just restarted. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7ad47fb..1abe738 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -866,6 +866,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); + List logAggregationReportsForApps = + startEvent.getLogAggregationReportsForApps(); + if (logAggregationReportsForApps != null + && !logAggregationReportsForApps.isEmpty()) { + rmNode.handleLogAggregationStatus(logAggregationReportsForApps); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 4fc983a..3976994 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -22,12 +22,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeStartedEvent extends RMNodeEvent { private List containerStatuses; private List runningApplications; + private List logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List containerReports, @@ -44,4 +46,13 @@ public RMNodeStartedEvent(NodeId nodeId, public List getRunningApplications() { return runningApplications; } + + public List getLogAggregationReportsForApps() { + return this.logAggregationReportsForApps; + } + + public void setLogAggregationReportsForApps( + List logAggregationReportsForApps) { + this.logAggregationReportsForApps = logAggregationReportsForApps; + } }