diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index c9453b3..1524f71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -168,17 +168,31 @@ public String toString() { private final Set alreadyUploadedLogFiles; private Set allExistingFileMeta = new HashSet(); private final boolean appFinished; + + /** + * The retention context to determine if log files are older than + * the retention policy configured. + */ + private final LogRetentionContext logRetentionContext; + /** + * The set of log files that are older than retention policy that will + * not be uploaded but ready for deletion. + */ + private final Set noMoreRetentionLogFiles = new HashSet(); + // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format public LogValue(List rootLogDirs, ContainerId containerId, String user) { - this(rootLogDirs, containerId, user, null, new HashSet(), true); + this(rootLogDirs, containerId, user, null, new HashSet(), + null, true); } public LogValue(List rootLogDirs, ContainerId containerId, String user, LogAggregationContext logAggregationContext, - Set alreadyUploadedLogFiles, boolean appFinished) { + Set alreadyUploadedLogFiles, + LogRetentionContext retentionContext, boolean appFinished) { this.rootLogDirs = new ArrayList(rootLogDirs); this.containerId = containerId; this.user = user; @@ -188,9 +202,10 @@ public LogValue(List rootLogDirs, ContainerId containerId, this.logAggregationContext = logAggregationContext; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; this.appFinished = appFinished; + this.logRetentionContext = retentionContext; } - private Set getPendingLogFilesToUploadForThisContainer() { + public Set getPendingLogFilesToUploadForThisContainer() { Set pendingUploadFiles = new HashSet(); for (String rootLogDir : this.rootLogDirs) { File appLogDir = @@ -297,6 +312,14 @@ public String getUser() { this.allExistingFileMeta.add(getLogFileMetaData(logFile)); } + // if log files are older than retention policy, do not upload them. + // but schedule them for deletion. + if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){ + noMoreRetentionLogFiles.addAll(candidates); + candidates.clear(); + return candidates; + } + if (this.logAggregationContext != null && candidates.size() > 0) { filterFiles( this.appFinished ? this.logAggregationContext.getIncludePattern() @@ -318,6 +341,7 @@ public boolean apply(File next) { }); candidates = Sets.newHashSet(mask); } + return candidates; } @@ -352,6 +376,14 @@ private void filterFiles(String pattern, Set candidates, return info; } + public Set getNoMoreRetentionLogFiles() { + Set path = new HashSet(); + for(File file: this.noMoreRetentionLogFiles) { + path.add(new Path(file.getAbsolutePath())); + } + return path; + } + public Set getAllExistingFilesMeta() { return this.allExistingFileMeta; } @@ -363,6 +395,39 @@ private String getLogFileMetaData(File file) { } /** + * A context for log retention to determine if files are older than + * the retention policy configured in YarnConfiguration + */ + public static class LogRetentionContext { + /** + * The time used with logRetentionMillis, to determine ages of + * log files and if files need to be uploaded. + */ + private final long logInitedTimeMillis; + /** + * The numbers of milli seconds since a log file is created to determine + * if we should upload it. -1 if disabled. + * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details. + */ + private final long logRetentionMillis; + + public LogRetentionContext(long logInitedTimeMillis, long + logRetentionMillis) { + this.logInitedTimeMillis = logInitedTimeMillis; + this.logRetentionMillis = logRetentionMillis; + } + + public boolean isDisabled() { + return logInitedTimeMillis < 0 || logRetentionMillis < 0; + } + + public boolean shouldRetainLog() { + return isDisabled() || + System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis; + } + } + + /** * The writer that writes out the aggregated logs. */ @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 94d5c1e..d81cc45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -338,7 +338,7 @@ private void recoverApplication(ContainerManagerApplicationProto p) LOG.info("Recovering application " + appId); ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context); + creds, context, p.getAppLogAggregationInitedTime()); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index fbc8453..819f820 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -26,15 +26,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -47,13 +54,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; - import com.google.common.annotations.VisibleForTesting; /** @@ -79,14 +86,30 @@ Map containers = new HashMap(); + /** + * The timestamp when the log aggregation has started for this application. + * Used to determine the age of application log files during log aggregation. + * When logAggregationRentention policy is enabled, log files older than the + * retention policy will not be uploaded but scheduled for deletion. + */ + private long applicationLogInitedTimestamp = -1; + private final NMStateStoreService appStateStore; + public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId, - Credentials credentials, Context context) { + Credentials credentials, Context context, long recoveredLogInitedTime) { + this(dispatcher, user, appId, credentials, context); + setAppLogInitedTimestamp(recoveredLogInitedTime); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context) { this.dispatcher = dispatcher; this.user = user; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); this.context = context; + this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -242,7 +265,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, app.applicationACLs, - app.logAggregationContext)); + app.logAggregationContext, app.applicationLogInitedTimestamp)); } } @@ -262,7 +285,59 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + app.setAppLogInitedTimestamp(event.getTimestamp()); + try { + app.appStateStore.storeApplication(app.appId, buildAppProto(app)); + } catch (Exception ex) { + LOG.warn("failed to update application state in state store"); + } + } + } + + @VisibleForTesting + void setAppLogInitedTimestamp(long appLogInitedTimestamp) { + this.applicationLogInitedTimestamp = appLogInitedTimestamp; + } + + static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app) { + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) app.appId).getProto()); + builder.setUser(app.getUser()); + + if (app.logAggregationContext != null) { + builder.setLogAggregationContext(( + (LogAggregationContextPBImpl)app.logAggregationContext).getProto()); + } + + builder.clearCredentials(); + if (app.credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + try { + app.credentials.writeTokenStorageToStream(dob); + builder.setCredentials(ByteString.copyFrom(dob.getData())); + } catch (IOException e) { + // should not occur + LOG.error("Cannot serialize credentials", e); + } } + + builder.clearAcls(); + if (app.applicationACLs != null) { + for (Map.Entry acl : app + .applicationACLs.entrySet()) { + YarnProtos.ApplicationACLMapProto p = YarnProtos + .ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()) + .build(); + builder.addAcls(p); + } + } + + builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp); + + return builder.build(); } /** @@ -349,6 +424,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { } } + @SuppressWarnings("unchecked") void handleAppFinishWithContainersCleanedup() { // Delete Application level resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index da7fc14..98ca1ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; @@ -136,6 +137,14 @@ new HashMap(); private final ContainerLogAggregationPolicy logAggPolicy; + + /** + * The value recovered from state store to determine the age of application + * log files if log retention is enabled. Files older than retention policy + * will not be uploaded but scheduled for cleaning up. -1 if not recovered. + */ + private final long recoveredLogInitedTime; + public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, @@ -143,6 +152,18 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs) { + this(dispatcher, deletionService, conf, appId, userUgi, nodeId, + dirsHandler, remoteNodeLogFileForApp, appAcls, + logAggregationContext, context, lfs, -1); + } + + public AppLogAggregatorImpl(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, long recoveredLogInitedTime) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -168,16 +189,16 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.retentionSize = configuredRentionSize; } long configuredRollingMonitorInterval = conf.getLong( - YarnConfiguration - .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, - YarnConfiguration - .DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); + YarnConfiguration + .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + YarnConfiguration + .DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); boolean debug_mode = conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, - DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); + DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); if (configuredRollingMonitorInterval > 0 && configuredRollingMonitorInterval < - NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) { + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) { if (debug_mode) { this.rollingMonitorInterval = configuredRollingMonitorInterval; } else { @@ -208,8 +229,9 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.rollingMonitorInterval <= 0 || this.logAggregationContext == null || this.logAggregationContext.getRolledLogsIncludePattern() == null || this.logAggregationContext.getRolledLogsIncludePattern() - .isEmpty() ? false : true; + .isEmpty() ? false : true; this.logAggPolicy = getLogAggPolicy(conf); + this.recoveredLogInitedTime = recoveredLogInitedTime; } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -294,7 +316,7 @@ private void uploadLogsForContainers(boolean appFinished) { // It includes: // a) all containers in pendingContainers: those containers are finished // and satisfy the ContainerLogAggregationPolicy. - // b) some set of running containers: For all the Running containers, + // b) some set of running containers: For all the Running containers // we use exitCode of 0 to find those which satisfy the // ContainerLogAggregationPolicy. Set pendingContainerInThisCycle = new HashSet(); @@ -324,9 +346,7 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationTimes++; try { - writer = - new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); + writer = createLogWriter(); // Write ACLs once when the writer is created. writer.writeApplicationACLs(appAcls); writer.writeApplicationOwner(this.userUgi.getShortUserName()); @@ -351,6 +371,8 @@ private void uploadLogsForContainers(boolean appFinished) { aggregator.doContainerLogAggregation(writer, appFinished); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; + Path[] paths = uploadedFilePathsInThisCycle.toArray(new + Path[uploadedFilePathsInThisCycle.size()]); this.delService.delete(this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycle .toArray(new Path[uploadedFilePathsInThisCycle.size()])); @@ -436,6 +458,11 @@ public Object run() throws Exception { } } + protected LogWriter createLogWriter() throws IOException { + return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); + } + private void sendLogAggregationReport( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = @@ -635,13 +662,21 @@ public synchronized void doLogAggregationOutOfBand() { this.notifyAll(); } - private class ContainerLogAggregator { + class ContainerLogAggregator { + private final AggregatedLogFormat.LogRetentionContext retentionContext; private final ContainerId containerId; - private Set uploadedFileMeta = - new HashSet(); - + private Set uploadedFileMeta = new HashSet(); public ContainerLogAggregator(ContainerId containerId) { this.containerId = containerId; + this.retentionContext = getRetentionContext(); + } + + private AggregatedLogFormat.LogRetentionContext getRetentionContext() { + final long logRetentionSecs = + conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); + return new AggregatedLogFormat.LogRetentionContext( + recoveredLogInitedTime, logRetentionSecs * 1000); } public Set doContainerLogAggregation(LogWriter writer, @@ -652,8 +687,8 @@ public ContainerLogAggregator(ContainerId containerId) { final LogKey logKey = new LogKey(containerId); final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, - userUgi.getShortUserName(), logAggregationContext, - this.uploadedFileMeta, appFinished); + userUgi.getShortUserName(), logAggregationContext, + this.uploadedFileMeta, retentionContext, appFinished); try { writer.append(logKey, logValue); } catch (Exception e) { @@ -674,7 +709,11 @@ public boolean apply(String next) { }); this.uploadedFileMeta = Sets.newHashSet(mask); - return logValue.getCurrentUpLoadedFilesPath(); + + // need to return files uploaded or older-than-retention clean up. + return Sets.union(logValue.getCurrentUpLoadedFilesPath(), + logValue.getNoMoreRetentionLogFiles()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 2d6b900..d10c5bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -318,12 +318,13 @@ public Object run() throws Exception { @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, + long recoveredLogInitedTime) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); initAppAggregator(appId, user, credentials, appAcls, - logAggregationContext); + logAggregationContext, recoveredLogInitedTime); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { @@ -342,10 +343,10 @@ FileContext getLocalFileContext(Configuration conf) { } } - protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, + long recoveredLogInitedTime) { // Get user's FileSystem credentials final UserGroupInformation userUgi = @@ -360,7 +361,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig())); + getLocalFileContext(getConfig()), recoveredLogInitedTime); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -460,7 +461,8 @@ public void handle(LogHandlerEvent event) { initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getApplicationAcls(), - appStartEvent.getLogAggregationContext()); + appStartEvent.getLogAggregationContext(), + appStartEvent.getRecoveredAppLogInitedTime()); break; case CONTAINER_FINISHED: LogHandlerContainerFinishedEvent containerFinishEvent = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index d3ff771..fb578f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -32,21 +32,36 @@ private final Credentials credentials; private final Map appAcls; private final LogAggregationContext logAggregationContext; + /** + * The value will be set when application is recovered from state store. + * We use this value in AppLogAggregatorImpl to determine, if log retention + * policy is enabled, if we need to upload old application log files. Files + * older than retention policy will not be uploaded but scheduled for + * deletion. + */ + private final long recoveredAppLogInitedTime; public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, Map appAcls) { - this(appId, user, credentials, appAcls, null); + this(appId, user, credentials, appAcls, null, -1); } public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, Map appAcls, LogAggregationContext logAggregationContext) { + this(appId, user, credentials, appAcls, logAggregationContext, -1); + } + + public LogHandlerAppStartedEvent(ApplicationId appId, String user, + Credentials credentials, Map appAcls, + LogAggregationContext logAggregationContext, long appLogInitedTime) { super(LogHandlerEventType.APPLICATION_STARTED); this.applicationId = appId; this.user = user; this.credentials = credentials; this.appAcls = appAcls; this.logAggregationContext = logAggregationContext; + this.recoveredAppLogInitedTime = appLogInitedTime; } public ApplicationId getApplicationId() { @@ -68,4 +83,8 @@ public String getUser() { public LogAggregationContext getLogAggregationContext() { return this.logAggregationContext; } + + public long getRecoveredAppLogInitedTime() { + return this.recoveredAppLogInitedTime; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index ade8c1a..b391454 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -30,6 +30,7 @@ message ContainerManagerApplicationProto { optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; optional LogAggregationContextProto log_aggregation_context = 5; + optional int64 appLogAggregationInitedTime = 6 [ default = -1 ]; } message DeletionServiceDeleteTaskProto { @@ -39,6 +40,7 @@ message DeletionServiceDeleteTaskProto { optional int64 deletionTime = 4; repeated string basedirs = 5; repeated int32 successorIds = 6; + } message LocalizedResourceProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 370a207..8da3847 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -18,14 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.refEq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -41,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -61,13 +58,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; public class TestApplication { @@ -296,6 +296,27 @@ protected ContainerTokenIdentifier waitForContainerTokenToExpire( } @Test + public void + testApplicationOnAppLogHandlingInitedEvt_ShouldStoreLogInitedTime() + throws IOException { + WrappedApplication wa = new WrappedApplication(5, 314159265358979L, + "yak", 0); + wa.initApplication(); + + ArgumentCaptor applicationProto = + ArgumentCaptor.forClass(ContainerManagerApplicationProto.class); + + final long timestamp = wa.applicationLogInited(); + + verify(wa.stateStoreService).storeApplication(any(ApplicationId.class), + applicationProto.capture()); + + assertEquals(applicationProto.getValue().getAppLogAggregationInitedTime() + , timestamp); + } + + + @Test @SuppressWarnings("unchecked") public void testAppFinishedOnCompletedContainers() { WrappedApplication wa = null; @@ -484,7 +505,8 @@ public boolean matches(Object argument) { final Context context; final Map containerTokenIdentifierMap; final NMTokenSecretManagerInNM nmTokenSecretMgr; - + final NMStateStoreService stateStoreService; + final ApplicationId appId; final Application app; @@ -511,6 +533,8 @@ public boolean matches(Object argument) { dispatcher.register(LogHandlerEventType.class, logAggregationBus); nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); + stateStoreService = mock(NMStateStoreService.class); + context = mock(Context.class); @@ -519,6 +543,7 @@ public boolean matches(Object argument) { when(context.getApplicationACLsManager()).thenReturn( new ApplicationACLsManager(conf)); when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); + when(context.getNMStateStore()).thenReturn(stateStoreService); // Setting master key MasterKey masterKey = new MasterKeyPBImpl(); @@ -586,6 +611,13 @@ public void applicationInited() { drainDispatcherEvents(); } + public long applicationLogInited() { + ApplicationEvent appEvt = new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); + app.handle(appEvt); + return appEvt.getTimestamp(); + } + public void appFinished() { app.handle(new ApplicationFinishEvent(appId, "Finish Application")); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java new file mode 100644 index 0000000..5622bfd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -0,0 +1,457 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.*; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class TestAppLogAggregatorImpl { + + private static final File localLogDir = new File("target", + TestAppLogAggregatorImpl.class.getName() + "-localLogDir"); + private static final File remoteLogFile = new File("target", + TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile"); + + @Test + public void testAggregatorWithRetentionPolicyDisabled_shouldUploadAllFiles() + throws Exception { + + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(localLogDir, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final File logFile1 = new File(containerLogDir, "logfile1"); + logFile1.createNewFile(); + final File logFile2 = new File(containerLogDir, "logfile2"); + logFile2.createNewFile(); + final File logFile3 = new File(containerLogDir, "logfile3"); + logFile3.createNewFile(); + + final Set expectedFilesToDelete = new HashSet() { + { + add(logFile1.getAbsolutePath()); + add(logFile2.getAbsolutePath()); + add(logFile3.getAbsolutePath()); + } + }; + // deletion service with verification to check files to delete + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile(expectedFilesToDelete); + + final YarnConfiguration config = new YarnConfiguration(); + config.setLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 10000); + final long recoveredLogInitedTime = -1; + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(applicationId, localLogDir.getAbsolutePath(), + config, recoveredLogInitedTime, deletionServiceWithExpectedFiles); + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext(containerId, ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify uploaded files + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.logWriter).append(any(LogKey.class), + logValCaptor.capture()); + Set expectedUploadedFile = new HashSet() { + { + add(logFile1.getAbsolutePath()); + add(logFile2.getAbsolutePath()); + add(logFile3.getAbsolutePath()); + } + }; + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded , expectedUploadedFile); + + // clean up + FileUtils.deleteDirectory(localLogDir); + FileUtils.deleteQuietly(remoteLogFile); + } + + @Before + public void setUp() throws IOException { + if(localLogDir.exists()) { + FileUtils.cleanDirectory(localLogDir); + } + if(remoteLogFile.exists()) { + FileUtils.cleanDirectory(remoteLogFile); + } + } + + @Test + public void testAggregatorWhenFilesOlderThanRetention_ShouldUploadNone() + throws IOException { + + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(localLogDir, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final File logFile1 = new File(containerLogDir, "logfile1"); + logFile1.createNewFile(); + final File logFile2 = new File(containerLogDir, "logfile2"); + logFile2.createNewFile(); + final File logFile3 = new File(containerLogDir, "logfile3"); + logFile3.createNewFile(); + + final Set expectedFilesToDelete = new HashSet() { + { + add(logFile1.getAbsolutePath()); + add(logFile2.getAbsolutePath()); + add(logFile3.getAbsolutePath()); + } + }; + // deletion service with verification to check files to delete + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile(expectedFilesToDelete); + + final YarnConfiguration config = new YarnConfiguration(); + long week = 7 * 24 * 60 * 60; + config.setLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, week); + final long recoveredLogInitedTimeMillis = System.currentTimeMillis() - + 2*week; + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(applicationId, localLogDir.getAbsolutePath(), + config, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles); + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext(containerId, ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify uploaded files + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.logWriter).append(any(LogKey.class), + logValCaptor.capture()); + Set expectedUploadedFile = new HashSet(); + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded , expectedUploadedFile); + + // clean up + FileUtils.deleteDirectory(localLogDir); + FileUtils.deleteQuietly(remoteLogFile); + } + + + @Test + public void testAggregatorWhenNoFileOlderThanRetentionPolicy_ShouldUploadAll() + throws IOException { + + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(localLogDir, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final File logFile1 = new File(containerLogDir, "logfile1"); + logFile1.createNewFile(); + final File logFile2 = new File(containerLogDir, "logfile2"); + logFile2.createNewFile(); + final File logFile3 = new File(containerLogDir, "logfile3"); + logFile3.createNewFile(); + + final Set expectedFilesToDelete = new HashSet() { + { + add(logFile1.getAbsolutePath()); + add(logFile2.getAbsolutePath()); + add(logFile3.getAbsolutePath()); + } + }; + // deletion service with verification to check files to delete + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile(expectedFilesToDelete); + + final YarnConfiguration config = new YarnConfiguration(); + long week = 7 * 24 * 60 * 60; + config.setLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, week); + final long recoveredLogInitedTimeMillis = System.currentTimeMillis() - + 60*60; + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(applicationId, localLogDir.getAbsolutePath(), + config, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles); + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext(containerId, ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify uploaded files + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.logWriter).append(any(LogKey.class), + logValCaptor.capture()); + Set expectedUploadedFile = new HashSet() { + { + add(logFile1.getAbsolutePath()); + add(logFile2.getAbsolutePath()); + add(logFile3.getAbsolutePath()); + } + }; + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded , expectedUploadedFile); + + // clean up + FileUtils.deleteDirectory(localLogDir); + FileUtils.deleteQuietly(remoteLogFile); + } + + private static void verifyFilesUploaded(Set filesUploaded, Set + filesExpected) { + final String errMsgPrefix = "The set of files uploaded are not the same " + + "as expected"; + if(filesUploaded.size() != filesUploaded.size()) { + fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " + + "expected size: " + filesExpected.size()); + } + for(String file: filesExpected) { + if(!filesUploaded.contains(file)) { + fail(errMsgPrefix + ": expecting " + file); + } + } + } + + private static void verifyFilesToDelete(Set files2ToDelete, + Set filesExpected) { + final String errMsgPrefix = "The set of paths for deletion are not the " + + "same as expected"; + if(files2ToDelete.size() != filesExpected.size()) { + fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " + + "expected size: " + filesExpected.size()); + } + for(String file: filesExpected) { + if(!files2ToDelete.contains(file)) { + fail(errMsgPrefix + ": expecting " + file); + } + } + } + + private static AppLogAggregatorInTest createAppLogAggregator( + ApplicationId applicationId, String rootLogDir, + YarnConfiguration config, long recoveredLogInitedTimeMillis, + DeletionService deletionServiceWithFilesToExpect) + throws IOException { + + final Dispatcher dispatcher = CreateNullDispatcher(); + final NodeId nodeId = NodeId.newInstance("localhost", 0); + final String userId = "AppLogAggregatorTest"; + final UserGroupInformation ugi = UserGroupInformation.createRemoteUser + (userId); + final LocalDirsHandlerService dirsService = createLocalDirsHandlerService + (config, rootLogDir); + final DeletionService deletionService = deletionServiceWithFilesToExpect; + final LogAggregationContext logAggregationContext = null; + final Map appAcls = new HashMap<>(); + + final Context context = createContext(config); + final FileContext fakeLfs = mock(FileContext.class); + final Path remoteLogDirForApp = new Path(remoteLogFile.getAbsolutePath()); + + return new AppLogAggregatorInTest(dispatcher, deletionService, + config, applicationId, ugi, nodeId, dirsService, + remoteLogDirForApp, appAcls, logAggregationContext, + context, fakeLfs, recoveredLogInitedTimeMillis); + } + + /** Create a deletionService that verifies the paths of container log + * files passed to delete method of DeletionService by AppLogAggregatorImpl. + * This approach is* taken due to lack of support of varargs captor in + * current mockito version 1.8.5 (The support is added in 1.10.x) + **/ + private static DeletionService createDeletionServiceWithExpectedFile + (final Set expectedPathsForDeletion) { + DeletionService deletionServiceWithExpectedFiles = mock(DeletionService + .class); + // verify paths passed to first invocation of delete method against + // expected paths + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + Set paths = new HashSet<>(); + Object[] args = invocationOnMock.getArguments(); + for(int i = 2; i < args.length; i++) { + Path path = (Path) args[i]; + paths.add(path.toUri().getRawPath()); + } + verifyFilesToDelete(expectedPathsForDeletion, paths); + return null; + } + }).doNothing().when(deletionServiceWithExpectedFiles).delete( + any(String.class), any(Path.class), Matchers.anyVararg()); + + return deletionServiceWithExpectedFiles; + } + + private static Dispatcher CreateNullDispatcher() { + return new Dispatcher() { + @Override + public EventHandler getEventHandler() { + return new EventHandler() { + @Override + public void handle(Event event) { + // do nothing + } + }; + } + + @Override + public void register(Class eventType, + EventHandler handler) { + // do nothing + } + }; + } + + private static LocalDirsHandlerService createLocalDirsHandlerService + (YarnConfiguration conf, final String rootLogDir) { + LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() { + @Override + public List getLogDirsForRead() { + return new ArrayList() {{ + add(rootLogDir); + }}; + } + @Override + public List getLogDirsForCleanup() { + return new ArrayList() {{ + add(rootLogDir); + }}; + } + }; + + dirsHandlerService.init(conf); + // appLogAggregator only calls LocalDirsHandlerServer for local directories + // so it is ok to not start the service. + return dirsHandlerService; + } + + private static Context createContext(YarnConfiguration conf) { + return new NodeManager.NMContext( + new NMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInNM(), + null, + new ApplicationACLsManager(conf), + new NMNullStateStoreService()); + } + + private static final class AppLogAggregatorInTest extends + AppLogAggregatorImpl { + + final DeletionService deletionService; + final ApplicationId applicationId; + final LogWriter logWriter; + final ArgumentCaptor logValue; + + public AppLogAggregatorInTest(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation ugi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, long recoveredLogInitedTime) throws IOException { + super(dispatcher, deletionService, conf, appId, ugi, nodeId, + dirsHandler,remoteNodeLogFileForApp, appAcls, + logAggregationContext, context, lfs, recoveredLogInitedTime); + this.applicationId = appId; + this.deletionService = deletionService; + + this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp); + this.logValue = ArgumentCaptor.forClass(LogValue.class); + } + + @Override + protected LogWriter createLogWriter() { + return this.logWriter; + } + + private LogWriter getSpiedLogWriter(Configuration conf, + UserGroupInformation ugi, Path remoteAppLogFile) throws IOException { + return spy(new LogWriter(conf, remoteAppLogFile, ugi)); + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 0392b38..900a5eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -20,10 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -701,7 +698,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( eq(appId), eq(user), any(Credentials.class), - anyMap(), any(LogAggregationContext.class)); + anyMap(), any(LogAggregationContext.class), anyLong()); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName(