diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 46e3323..1c2ef7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1000,8 +1000,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - - /** + + /** Log aggregation service class. */ + public static final String LOG_AGGREGATION_SERVICE_CLASS = + YARN_PREFIX + "log-aggregation-service.class"; + public static final String DEFAULT_LOG_AGGREGATION_SERVICE_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.containermanager." + + "logaggregation.LogAggregationService"; + + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 668821d..8e306b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -91,6 +91,8 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_LOG_AGGREGATION_SERVICE_CLASS); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e956507..891b6d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1129,6 +1129,12 @@ + The service class for log aggregation. + yarn.log-aggregation-service.class + org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService + + + Whether to enable log aggregation. Log aggregation collects each container's logs and moves these logs onto a file-system, for e.g. HDFS, after the application completes. Users can configure the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index cb63ae3..07ea743 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -22,6 +22,8 @@ import java.io.DataInputStream; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException; import org.apache.hadoop.yarn.exceptions.InvalidContainerException; @@ -421,8 +424,25 @@ protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { - return new LogAggregationService(this.dispatcher, context, - deletionService, dirsHandler); + try { + Class clazz = Class + .forName(conf.get(YarnConfiguration.LOG_AGGREGATION_SERVICE_CLASS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_SERVICE_CLASS)); + LOG.info( + "Created LogAggregationService class: " + clazz.getCanonicalName()); + Constructor cons = + clazz.getConstructor(Dispatcher.class, Context.class, + DeletionService.class, LocalDirsHandlerService.class); + LogAggregationService logAggregationService = + (LogAggregationService) cons.newInstance(this.dispatcher, context, + deletionService, dirsHandler); + return logAggregationService; + } catch (ClassNotFoundException | NoSuchMethodException + | InstantiationException | IllegalAccessException + | InvocationTargetException e) { + LOG.error("Log aggregation service not initialized", e); + throw new YarnRuntimeException(e); + } } else { return new NonAggregatingLogHandler(this.dispatcher, deletionService, dirsHandler, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 872b805..d0c4e4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -97,29 +97,29 @@ private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; - private final LocalDirsHandlerService dirsHandler; - private final Dispatcher dispatcher; - private final ApplicationId appId; - private final String applicationId; - private boolean logAggregationDisabled = false; - private final Configuration conf; - private final DeletionService delService; - private final UserGroupInformation userUgi; - private final Path remoteNodeLogFileForApp; - private final Path remoteNodeTmpLogFileForApp; - - private final BlockingQueue pendingContainers; - private final AtomicBoolean appFinishing = new AtomicBoolean(); - private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); - private final AtomicBoolean aborted = new AtomicBoolean(); - private final Map appAcls; - private final FileContext lfs; - private final LogAggregationContext logAggregationContext; - private final Context context; - private final int retentionSize; - private final long rollingMonitorInterval; - private final boolean logAggregationInRolling; - private final NodeId nodeId; + protected final LocalDirsHandlerService dirsHandler; + protected final Dispatcher dispatcher; + protected final ApplicationId appId; + protected final String applicationId; + protected boolean logAggregationDisabled = false; + protected final Configuration conf; + protected final DeletionService delService; + protected final UserGroupInformation userUgi; + protected final Path remoteNodeLogFileForApp; + protected final Path remoteNodeTmpLogFileForApp; + + protected final BlockingQueue pendingContainers; + protected final AtomicBoolean appFinishing = new AtomicBoolean(); + protected final AtomicBoolean appAggregationFinished = new AtomicBoolean(); + protected final AtomicBoolean aborted = new AtomicBoolean(); + protected final Map appAcls; + protected final FileContext lfs; + protected final LogAggregationContext logAggregationContext; + protected final Context context; + protected final int retentionSize; + protected final long rollingMonitorInterval; + protected final boolean logAggregationInRolling; + protected final NodeId nodeId; // These variables are only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -194,9 +194,9 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.recoveredLogInitedTime = recoveredLogInitedTime; } - private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { - ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf); - String params = getLogAggPolicyParameters(conf); + protected ContainerLogAggregationPolicy getLogAggPolicy(Configuration myConf) { + ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(myConf); + String params = getLogAggPolicyParameters(myConf); if (params != null) { policy.parseParameters(params); } @@ -205,7 +205,7 @@ private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { // Use the policy class specified in LogAggregationContext if available. // Otherwise use the cluster-wide default policy class. - private ContainerLogAggregationPolicy getLogAggPolicyInstance( + protected ContainerLogAggregationPolicy getLogAggPolicyInstance( Configuration conf) { Class policyClass = null; if (this.logAggregationContext != null) { @@ -242,18 +242,18 @@ private ContainerLogAggregationPolicy getLogAggPolicyInstance( // Use the policy parameters specified in LogAggregationContext if available. // Otherwise use the cluster-wide default policy parameters. - private String getLogAggPolicyParameters(Configuration conf) { + protected String getLogAggPolicyParameters(Configuration myConf) { String params = null; if (this.logAggregationContext != null) { params = this.logAggregationContext.getLogAggregationPolicyParameters(); } if (params == null) { - params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS); + params = myConf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS); } return params; } - private void uploadLogsForContainers(boolean appFinished) { + protected void uploadLogsForContainers(boolean appFinished) { if (this.logAggregationDisabled) { return; } @@ -423,7 +423,7 @@ protected LogWriter createLogWriter() throws IOException { this.userUgi); } - private void sendLogAggregationReport( + protected void sendLogAggregationReport( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); @@ -433,7 +433,7 @@ private void sendLogAggregationReport( this.context.getLogAggregationStatusForApps().add(report); } - private void cleanOldLogs() { + protected void cleanOldLogs() { try { final FileSystem remoteFS = this.remoteNodeLogFileForApp.getFileSystem(conf); @@ -509,7 +509,7 @@ public void run() { } @SuppressWarnings("unchecked") - private void doAppLogAggregation() { + protected void doAppLogAggregation() { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { @@ -545,7 +545,7 @@ private void doAppLogAggregation() { this.appAggregationFinished.set(true); } - private void doAppLogAggregationPostCleanUp() { + protected void doAppLogAggregationPostCleanUp() { // Remove the local app-log-dirs List localAppLogDirs = new ArrayList(); for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { @@ -575,7 +575,7 @@ private Path getRemoteNodeTmpLogFileForApp() { // TODO: The condition: containerId.getId() == 1 to determine an AM container // is not always true. - private boolean shouldUploadLogs(ContainerLogContext logContext) { + protected boolean shouldUploadLogs(ContainerLogContext logContext) { return logAggPolicy.shouldDoLogAggregation(logContext); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index a4ae643..72b8bf5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -89,28 +89,28 @@ * Permissions for the top level directory under which app directories will be * created. */ - private static final FsPermission TLDIR_PERMISSIONS = FsPermission + public static final FsPermission TLDIR_PERMISSIONS = FsPermission .createImmutable((short) 01777); /** * Permissions for the Application directory. */ - private static final FsPermission APP_DIR_PERMISSIONS = FsPermission + public static final FsPermission APP_DIR_PERMISSIONS = FsPermission .createImmutable((short) 0770); - private final Context context; - private final DeletionService deletionService; - private final Dispatcher dispatcher; + protected final Context context; + protected final DeletionService deletionService; + protected final Dispatcher dispatcher; - private LocalDirsHandlerService dirsHandler; - Path remoteRootLogDir; - String remoteRootLogDirSuffix; - private NodeId nodeId; + protected LocalDirsHandlerService dirsHandler; + protected Path remoteRootLogDir; + protected String remoteRootLogDirSuffix; + protected NodeId nodeId; - private final ConcurrentMap appLogAggregators; - private boolean logPermError = true; + protected final ConcurrentMap appLogAggregators; + protected boolean logPermError = true; @VisibleForTesting - ExecutorService threadPool; + protected ExecutorService threadPool; public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { @@ -182,7 +182,7 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - private void stopAggregators() { + protected void stopAggregators() { threadPool.shutdown(); boolean supervised = getConfig().getBoolean( YarnConfiguration.NM_RECOVERY_SUPERVISED, @@ -217,24 +217,26 @@ private void stopAggregators() { } } - protected FileSystem getFileSystem(Configuration conf) throws IOException { - return this.remoteRootLogDir.getFileSystem(conf); + protected FileSystem getFileSystem(Configuration conf, Path myRemoteRootLogDir) + throws IOException { + return myRemoteRootLogDir.getFileSystem(conf); } - void verifyAndCreateRemoteLogDir(Configuration conf) { + protected void verifyAndCreateRemoteLogDir(Configuration conf, + Path myRemoteRootLogDir) { // Checking the existence of the TLD FileSystem remoteFS = null; try { - remoteFS = getFileSystem(conf); + remoteFS = getFileSystem(conf, myRemoteRootLogDir); } catch (IOException e) { throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e); } boolean remoteExists = true; try { FsPermission perms = - remoteFS.getFileStatus(this.remoteRootLogDir).getPermission(); + remoteFS.getFileStatus(myRemoteRootLogDir).getPermission(); if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) { - LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir + LOG.warn("Remote Root Log Dir [" + myRemoteRootLogDir + "] already exist, but with incorrect permissions. " + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms + "]." + " The cluster may have problems with multiple users."); @@ -247,20 +249,20 @@ void verifyAndCreateRemoteLogDir(Configuration conf) { } catch (IOException e) { throw new YarnRuntimeException( "Failed to check permissions for dir [" - + this.remoteRootLogDir + "]", e); + + myRemoteRootLogDir + "]", e); } if (!remoteExists) { - LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir + LOG.warn("Remote Root Log Dir [" + myRemoteRootLogDir + "] does not exist. Attempting to create it."); try { Path qualified = - this.remoteRootLogDir.makeQualified(remoteFS.getUri(), + myRemoteRootLogDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS)); remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS)); } catch (IOException e) { throw new YarnRuntimeException("Failed to create remoteLogDir [" - + this.remoteRootLogDir + "]", e); + + myRemoteRootLogDir + "]", e); } } } @@ -301,6 +303,7 @@ private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) } protected void createAppDir(final String user, final ApplicationId appId, + final Path myRemoteRootLogDir, final String myRemoteRootLogDirSuffix, UserGroupInformation userUgi) { try { userUgi.doAs(new PrivilegedExceptionAction() { @@ -308,26 +311,24 @@ protected void createAppDir(final String user, final ApplicationId appId, public Object run() throws Exception { try { // TODO: Reuse FS for user? - FileSystem remoteFS = getFileSystem(getConfig()); + FileSystem remoteFS = getFileSystem(getConfig(), myRemoteRootLogDir); // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes Path appDir = LogAggregationUtils.getRemoteAppLogDir( - LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); + myRemoteRootLogDir, appId, user, myRemoteRootLogDirSuffix); appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, - LogAggregationService.this.remoteRootLogDirSuffix); + myRemoteRootLogDir, user, myRemoteRootLogDirSuffix); suffixDir = suffixDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - LogAggregationService.this.remoteRootLogDir, user); + Path userDir = LogAggregationUtils + .getRemoteLogUserDir(myRemoteRootLogDir, user); userDir = userDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); @@ -355,13 +356,13 @@ public Object run() throws Exception { } @SuppressWarnings("unchecked") - private void initApp(final ApplicationId appId, String user, + protected void initApp(final ApplicationId appId, String user, Credentials credentials, Map appAcls, LogAggregationContext logAggregationContext, long recoveredLogInitedTime) { ApplicationEvent eventResponse; try { - verifyAndCreateRemoteLogDir(getConfig()); + verifyAndCreateRemoteLogDir(getConfig(), this.remoteRootLogDir); initAppAggregator(appId, user, credentials, appAcls, logAggregationContext, recoveredLogInitedTime); eventResponse = new ApplicationEvent(appId, @@ -374,7 +375,7 @@ private void initApp(final ApplicationId appId, String user, this.dispatcher.getEventHandler().handle(eventResponse); } - FileContext getLocalFileContext(Configuration conf) { + protected FileContext getLocalFileContext(Configuration conf) { try { return FileContext.getLocalFSFileContext(conf); } catch (IOException e) { @@ -409,7 +410,8 @@ protected void initAppAggregator(final ApplicationId appId, String user, YarnRuntimeException appDirException = null; try { // Create the app dir - createAppDir(user, appId, userUgi); + createAppDir(user, appId, this.remoteRootLogDir, + this.remoteRootLogDirSuffix, userUgi); } catch (Exception e) { appLogAggregator.disableLogAggregation(); if (!(e instanceof YarnRuntimeException)) { @@ -453,7 +455,7 @@ int getNumAggregators() { return this.appLogAggregators.size(); } - private void stopContainer(ContainerId containerId, int exitCode) { + protected void stopContainer(ContainerId containerId, int exitCode) { // A container is complete. Put this containers' logs up for aggregation if // this containers' logs are needed. @@ -477,7 +479,7 @@ private void stopContainer(ContainerId containerId, int exitCode) { } @SuppressWarnings("unchecked") - private void stopApp(ApplicationId appId) { + protected void stopApp(ApplicationId appId) { // App is complete. Finish up any containers' pending log aggregation and // close the application specific logFile. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 1edb841..80c45d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -549,10 +549,9 @@ public void testVerifyAndCreateRemoteDirsFailure() logAggregationService.init(this.conf); YarnRuntimeException e = new YarnRuntimeException("KABOOM!"); - doThrow(e) - .when(logAggregationService).verifyAndCreateRemoteLogDir( - any(Configuration.class)); - + doThrow(e).when(logAggregationService) + .verifyAndCreateRemoteLogDir(any(Configuration.class), any(Path.class)); + logAggregationService.start(); // Now try to start an application @@ -618,8 +617,9 @@ public void testVerifyAndCreateRemoteDirNonExistence() boolean existsBefore = aNewFile.exists(); assertTrue("The new file already exists!", !existsBefore); - logAggregationService.verifyAndCreateRemoteLogDir(this.conf); - + logAggregationService.verifyAndCreateRemoteLogDir(this.conf, + new Path(aNewFile.getAbsolutePath())); + boolean existsAfter = aNewFile.exists(); assertTrue("The new aggregate file is not successfully created", existsAfter); aNewFile.delete(); //housekeeping @@ -644,7 +644,8 @@ public void testAppLogDirCreation() throws Exception { LogAggregationService aggSvc = new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler) { @Override - protected FileSystem getFileSystem(Configuration conf) { + protected FileSystem getFileSystem(Configuration conf, + Path myRemoteRootLogDir) { return spyFs; } }; @@ -762,9 +763,9 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() appLogDir.mkdir(); Exception e = new RuntimeException("KABOOM!"); - doThrow(e) - .when(logAggregationService).createAppDir(any(String.class), - any(ApplicationId.class), any(UserGroupInformation.class)); + doThrow(e).when(logAggregationService).createAppDir(any(String.class), + any(ApplicationId.class), any(Path.class), any(String.class), + any(UserGroupInformation.class)); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName(