diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 245dc103e93..5e1e3097d81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -28,6 +28,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.security.token.SecretManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +92,7 @@ private boolean logAggregationDisabled = false; private final Configuration conf; private final DeletionService delService; + private final String user; private final UserGroupInformation userUgi; private final Path remoteNodeLogFileForApp; private final Path remoteNodeTmpLogFileForApp; @@ -117,6 +120,7 @@ private final LogAggregationFileController logAggregationFileController; + private final Set invalidTokenApps; /** * The value recovered from state store to determine the age of application @@ -127,44 +131,47 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, + ApplicationId appId, String user, UserGroupInformation userUgi, + NodeId nodeId, LocalDirsHandlerService dirsHandler, + Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long rollingMonitorInterval) { - this(dispatcher, deletionService, conf, appId, userUgi, nodeId, - dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, rollingMonitorInterval, -1, null); + FileContext lfs, long rollingMonitorInterval, + Set invalidTokenApps) { + this(dispatcher, deletionService, conf, appId, user, userUgi, nodeId, + dirsHandler, remoteNodeLogFileForApp, appAcls, logAggregationContext, + context, lfs, rollingMonitorInterval, -1, null, invalidTokenApps); } public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, + ApplicationId appId, String user, UserGroupInformation userUgi, + NodeId nodeId, LocalDirsHandlerService dirsHandler, + Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval, - long recoveredLogInitedTime) { - this(dispatcher, deletionService, conf, appId, userUgi, nodeId, + long recoveredLogInitedTime, Set invalidTokenApps) { + this(dispatcher, deletionService, conf, appId, user, userUgi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, logAggregationContext, context, lfs, rollingMonitorInterval, - recoveredLogInitedTime, null); + recoveredLogInitedTime, null, invalidTokenApps); } public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - Map appAcls, + ApplicationId appId, String user, UserGroupInformation userUgi, + NodeId nodeId, LocalDirsHandlerService dirsHandler, + Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval, long recoveredLogInitedTime, - LogAggregationFileController logAggregationFileController) { + LogAggregationFileController logAggregationFileController, + Set invalidTokenApps) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; this.appId = appId; this.applicationId = appId.toString(); + this.user = user; this.userUgi = userUgi; this.dirsHandler = dirsHandler; this.pendingContainers = new LinkedBlockingQueue(); @@ -175,6 +182,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.nodeId = nodeId; this.logAggPolicy = getLogAggPolicy(conf); this.recoveredLogInitedTime = recoveredLogInitedTime; + this.invalidTokenApps = invalidTokenApps; this.logFileSizeThreshold = conf.getLong(YarnConfiguration.LOG_AGGREGATION_DEBUG_FILESIZE, YarnConfiguration.DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE); @@ -462,6 +470,7 @@ private void sendLogAggregationReportInternal( @Override public void run() { try { + verifyAndCreateAppDir(); doAppLogAggregation(); } catch (LogAggregationDFSException e) { // if the log aggregation could not be performed due to DFS issues @@ -485,6 +494,24 @@ public void run() { } } + private boolean verifyAndCreateAppDir() { + try { + logAggregationFileController.verifyAndCreateRemoteLogDir(); + // Create the app dir + logAggregationFileController.createAppDir(user, appId, userUgi); + } catch (Exception e) { + // add to disabled aggregators if due to InvalidToken + if (e.getCause() instanceof SecretManager.InvalidToken) { + invalidTokenApps.add(appId); + } + LOG.warn("Failed to verify and create log aggregator dir " + + "for the application " + applicationId, e); + disableLogAggregation(); + return false; + } + return true; + } + private void doAppLogAggregation() throws LogAggregationDFSException { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 2280e750f88..73e18c4da07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -25,9 +25,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.security.token.SecretManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,36 +251,18 @@ protected void initAppAggregator(final ApplicationId appId, String user, LogAggregationFileController logAggregationFileController = getLogAggregationFileController(getConfig()); - logAggregationFileController.verifyAndCreateRemoteLogDir(); // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, this.nodeId, dirsHandler, + getConfig(), appId, user, userUgi, this.nodeId, dirsHandler, logAggregationFileController.getRemoteNodeLogFileForApp(appId, user, nodeId), appAcls, logAggregationContext, this.context, getLocalFileContext(getConfig()), this.rollingMonitorInterval, - recoveredLogInitedTime, logAggregationFileController); + recoveredLogInitedTime, logAggregationFileController, + invalidTokenApps); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } - // wait until check for existing aggregator to create dirs - YarnRuntimeException appDirException = null; - try { - // Create the app dir - logAggregationFileController.createAppDir(user, appId, userUgi); - } catch (Exception e) { - appLogAggregator.disableLogAggregation(); - - // add to disabled aggregators if due to InvalidToken - if (e.getCause() instanceof SecretManager.InvalidToken) { - invalidTokenApps.add(appId); - } - if (!(e instanceof YarnRuntimeException)) { - appDirException = new YarnRuntimeException(e); - } else { - appDirException = (YarnRuntimeException)e; - } - } // TODO Get the user configuration for the list of containers that need log // aggregation. @@ -300,10 +279,6 @@ public void run() { } }; this.threadPool.execute(aggregatorWrapper); - - if (appDirException != null) { - throw appDirException; - } } protected void closeFileSystems(final UserGroupInformation userUgi) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index ab9d0f1c097..61a1293d10e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -21,8 +21,12 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -33,8 +37,10 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; @@ -51,13 +57,17 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -69,18 +79,19 @@ import java.util.Map; import java.util.Set; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; /** * Unit tests of AppLogAggregatorImpl class. */ public class TestAppLogAggregatorImpl { + private static final Logger LOG = + LoggerFactory.getLogger(TestAppLogAggregatorImpl.class); private static final File LOCAL_LOG_DIR = new File("target", TestAppLogAggregatorImpl.class.getName() + "-localLogDir"); private static final File REMOTE_LOG_FILE = new File("target", @@ -102,6 +113,216 @@ public void cleanUp() throws IOException { FileUtils.deleteQuietly(REMOTE_LOG_FILE); } + @Test + public void testVerifyAndCreateRemoteDirNonExistence() + throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + final String user = "nobody"; + conf.set(YarnConfiguration.NM_LOG_DIRS, LOCAL_LOG_DIR.getAbsolutePath()); + File aNewFile = new File(String.valueOf("tmp"+System.currentTimeMillis())); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + aNewFile.getAbsolutePath()); + + boolean existsBefore = aNewFile.exists(); + assertTrue("The new file already exists!", !existsBefore); + + final LogAggregationTFileController spyFileFormat = spy( + new LogAggregationTFileController()); + spyFileFormat.initialize(conf, "TFile"); + + Context context = createContext(conf); + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + // start an application and verify user, suffix, and app dirs created + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, user, LOCAL_LOG_DIR.getAbsolutePath(), + conf, context, 1000L, deletionServiceWithExpectedFiles, + spyFileFormat, new HashSet<>()); + // set app aborted flag first + appLogAggregator.abortLogAggregation(); + appLogAggregator.run(); + + boolean existsAfter = aNewFile.exists(); + assertTrue("The new aggregate file is not successfully created", + existsAfter); + aNewFile.delete(); //housekeeping + } + + @Test + public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() + throws IOException { + final YarnConfiguration conf = new YarnConfiguration(); + final String user = "nobody"; + conf.set(YarnConfiguration.NM_LOG_DIRS, LOCAL_LOG_DIR.getAbsolutePath()); + Path aNewFile = new Path(String.valueOf("tmp"+System.currentTimeMillis())); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, aNewFile.getName()); + + final LogAggregationTFileController spyFileFormat = spy( + new LogAggregationTFileController()); + spyFileFormat.initialize(conf, "TFile"); + + Context context = createContext(conf); + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + // start an application and verify user, suffix, and app dirs created + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, user, LOCAL_LOG_DIR.getAbsolutePath(), + conf, context, 1000L, deletionServiceWithExpectedFiles, + spyFileFormat, new HashSet<>()); + // set app aborted flag first + appLogAggregator.abortLogAggregation(); + appLogAggregator.run(); + + String targetGroup = + UserGroupInformation.getLoginUser().getPrimaryGroupName(); + FileSystem fs = FileSystem.get(conf); + FileStatus fileStatus = fs.getFileStatus(aNewFile); + Assert.assertEquals("The new aggregate file is not successfully created", + fileStatus.getGroup(), targetGroup); + + fs.delete(aNewFile, true); + } + + @Test + public void testAppLogDirCreation() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + final String user = "nobody"; + final String inputSuffix = "logs-tfile"; + conf.set(YarnConfiguration.NM_LOG_DIRS, + LOCAL_LOG_DIR.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + REMOTE_LOG_FILE.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "logs"); + + FileSystem fs = FileSystem.get(conf); + final FileSystem spyFs = spy(FileSystem.get(conf)); + + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); + + Context context = createContext(conf); + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + // start an application and verify user, suffix, and app dirs created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + Path userDir = fs.makeQualified(new Path( + REMOTE_LOG_FILE.getAbsolutePath(), user)); + Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( + new Path(REMOTE_LOG_FILE.getAbsolutePath()), + user, inputSuffix, appId)); + Path suffixDir = bucketDir.getParent(); + Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(REMOTE_LOG_FILE.getAbsolutePath()), appId, user, + inputSuffix)); + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, user, appDir.toString(), conf, + context, 1000L, deletionServiceWithExpectedFiles, spyFileFormat, + new HashSet()); + // set app aborted flag first + appLogAggregator.abortLogAggregation(); + appLogAggregator.run(); + verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); + + // start another application and verify only app dir created + ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); + Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(REMOTE_LOG_FILE.getAbsolutePath()), appId2, user, + inputSuffix)); + final AppLogAggregatorInTest appLogAggregator2 = + createAppLogAggregator(appId2, user, appDir2.toString(), conf, + context, 1000L, deletionServiceWithExpectedFiles, spyFileFormat, + new HashSet()); + // set app aborted flag first + appLogAggregator2.abortLogAggregation(); + appLogAggregator2.run(); + verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); + + // start another application with the app dir already created and verify + // we do not try to create it again + ApplicationId appId3 = BuilderUtils.newApplicationId(2, 2); + Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(REMOTE_LOG_FILE.getAbsolutePath()), appId3, user, + inputSuffix)); + new File(appDir3.toUri().getPath()).mkdir(); + final AppLogAggregatorInTest appLogAggregator3 = + createAppLogAggregator(appId3, user, appDir3.toString(), conf, + context, 1000L, deletionServiceWithExpectedFiles, spyFileFormat, + new HashSet()); + // set app aborted flag first + appLogAggregator3.abortLogAggregation(); + appLogAggregator3.run(); + verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + + // Verify we do not create bucket dir again + ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003); + Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( + new Path(REMOTE_LOG_FILE.getAbsolutePath()), appId4, user, + inputSuffix)); + Path bucketDir4 = appDir4.getParent(); + new File(bucketDir4.toUri().getPath()).mkdir(); + final AppLogAggregatorInTest appLogAggregator4 = + createAppLogAggregator(appId4, user, appDir4.toString(), conf, + context, 1000L, deletionServiceWithExpectedFiles, spyFileFormat, + new HashSet()); + // set app aborted flag first + appLogAggregator4.abortLogAggregation(); + appLogAggregator4.run(); + verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class)); + } + + @Test + public void testAppLogDirCreationFailure() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + final String user = "nobody"; + final String inputSuffix = "logs-tfile"; + + final LogAggregationTFileController spyFileFormat = spy( + new LogAggregationTFileController()); + spyFileFormat.initialize(conf, "TFile"); + Exception e = + new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!")); + doThrow(e).when(spyFileFormat) + .createAppDir(any(String.class), any(ApplicationId.class), + any(UserGroupInformation.class)); + + Context context = createContext(conf); + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(Collections.emptySet()); + + Set invalidTokenApps = new HashSet<>(); + // start an application and verify user, suffix, and app dirs created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, user, LOCAL_LOG_DIR.getAbsolutePath(), + conf, context, 1000L, deletionServiceWithExpectedFiles, + spyFileFormat, invalidTokenApps); + // set app aborted flag first + appLogAggregator.abortLogAggregation(); + appLogAggregator.run(); + + assertThat(invalidTokenApps).hasSize(1); + Assert.assertFalse("Aggregation should be disabled", + appLogAggregator.isAggregationEnabled()); + } + @Test public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles() throws Exception { @@ -238,10 +459,12 @@ public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload( format.initialize(config, "TFile"); Context context = createContext(config); + String user = "nobody"; final AppLogAggregatorInTest appLogAggregator = - createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), + createAppLogAggregator(appId, user, LOCAL_LOG_DIR.getAbsolutePath(), config, context, recoveredLogInitedTimeMillis, - deletionServiceWithExpectedFiles, format); + deletionServiceWithExpectedFiles, format, + new HashSet()); appLogAggregator.startContainerLogAggregation( new ContainerLogContext(containerId, ContainerType.TASK, 0)); // set app finished flag first @@ -278,11 +501,12 @@ private static void verifyFilesUploaded(Set filesUploaded, } private static AppLogAggregatorInTest createAppLogAggregator( - ApplicationId applicationId, String rootLogDir, + ApplicationId applicationId, String user, String rootLogDir, YarnConfiguration config, Context context, long recoveredLogInitedTimeMillis, DeletionService deletionServiceWithFilesToExpect, - LogAggregationTFileController tFileController) + LogAggregationTFileController tFileController, + Set invalidTokenApps) throws IOException { final Dispatcher dispatcher = createNullDispatcher(); @@ -299,9 +523,10 @@ private static AppLogAggregatorInTest createAppLogAggregator( final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); return new AppLogAggregatorInTest(dispatcher, deletionService, - config, applicationId, ugi, nodeId, dirsService, + config, applicationId, user, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis, tFileController); + context, fakeLfs, recoveredLogInitedTimeMillis, tFileController, + invalidTokenApps); } /** @@ -416,16 +641,18 @@ private static Context createContext(YarnConfiguration conf) { public AppLogAggregatorInTest(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation ugi, NodeId nodeId, - LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + ApplicationId appId, String user, UserGroupInformation ugi, + NodeId nodeId, LocalDirsHandlerService dirsHandler, + Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long recoveredLogInitedTime, - LogAggregationTFileController format) throws IOException { - super(dispatcher, deletionService, conf, appId, ugi, nodeId, + LogAggregationTFileController format, + Set invalidTokenApps) throws IOException { + super(dispatcher, deletionService, conf, appId, user, ugi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, logAggregationContext, context, lfs, -1, recoveredLogInitedTime, - format); + format, invalidTokenApps); this.applicationId = appId; this.deletionService = deletionService; this.logValue = ArgumentCaptor.forClass(LogValue.class); @@ -443,21 +670,24 @@ public void testDFSQuotaExceeded() throws Exception { final YarnConfiguration config = new YarnConfiguration(); ApplicationId appId = ApplicationId.newInstance(1357543L, 1); + String user = "nobody"; // we need a LogAggregationTFileController that throws a // LogAggregationDFSException - LogAggregationTFileController format = - Mockito.mock(LogAggregationTFileController.class); + LogAggregationTFileController spyFileFormat = spy( + new LogAggregationTFileController()); + spyFileFormat.initialize(config, "TFile"); Mockito.doThrow(new LogAggregationDFSException()) - .when(format).closeWriter(); + .when(spyFileFormat).closeWriter(); NodeManager.NMContext context = (NMContext) createContext(config); context.setNMLogAggregationStatusTracker( Mockito.mock(NMLogAggregationStatusTracker.class)); final AppLogAggregatorInTest appLogAggregator = - createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), - config, context, 1000L, deletionServiceWithExpectedFiles, format); + createAppLogAggregator(appId, user, LOCAL_LOG_DIR.getAbsolutePath(), + config, context, 1000L, deletionServiceWithExpectedFiles, + spyFileFormat, new HashSet()); appLogAggregator.startContainerLogAggregation( new ContainerLogContext( @@ -476,6 +706,7 @@ public void testDFSQuotaExceeded() throws Exception { Set filesUploaded = new HashSet<>(); LogValue logValue = logValCaptor.getValue(); for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + LOG.info("nb"); filesUploaded.add(file.getAbsolutePath()); } verifyFilesUploaded(filesUploaded, Collections.emptySet()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 0e366827587..d2825e57c02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -71,11 +71,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -102,7 +100,6 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -112,7 +109,6 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -143,7 +139,6 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.eclipse.jetty.util.MultiException; import com.google.common.base.Supplier; @@ -555,7 +550,7 @@ public void testMultipleAppsLogAggregation() throws Exception { } @Test - public void testVerifyAndCreateRemoteDirsFailure() + public void testVerifyAndCreateRemoteDirsFailureWithInitedSuccess() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, @@ -593,203 +588,17 @@ public LogAggregationFileController getLogAggregationFileController( this.user, null, this.acls, contextWithAMAndFailed)); dispatcher.await(); - // Verify that it failed + // Verify that it worked ApplicationEvent[] expectedEvents = new ApplicationEvent[] { new ApplicationEvent(appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) - }; - checkEvents(appEventHandler, expectedEvents, false, - "getType", "getApplicationID", "getDiagnostic"); - - Mockito.reset(logAggregationService); - - // Now try to start another one - ApplicationId appId2 = - BuilderUtils.newApplicationId(System.currentTimeMillis(), - (int) (Math.random() * 1000)); - File appLogDir = - new File(localLogDir, appId2.toString()); - appLogDir.mkdir(); - logAggregationService.handle(new LogHandlerAppStartedEvent(appId2, - this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); - - // Verify that it worked - expectedEvents = new ApplicationEvent[] { - new ApplicationEvent(appId, // original failure - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), - new ApplicationEvent(appId2, // success ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - - logAggregationService.stop(); - } - @Test - public void testVerifyAndCreateRemoteDirNonExistence() - throws Exception { - this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); - File aNewFile = new File(String.valueOf("tmp"+System.currentTimeMillis())); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - aNewFile.getAbsolutePath()); - - LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); - logAggregationService.init(this.conf); - logAggregationService.start(); - boolean existsBefore = aNewFile.exists(); - assertTrue("The new file already exists!", !existsBefore); - - ApplicationId appId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - LogAggregationContext contextWithAMAndFailed = - Records.newRecord(LogAggregationContext.class); - contextWithAMAndFailed.setLogAggregationPolicyClassName( - AMOrFailedContainerLogAggregationPolicy.class.getName()); - logAggregationService.handle(new LogHandlerAppStartedEvent(appId, - this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); - - boolean existsAfter = aNewFile.exists(); - assertTrue("The new aggregate file is not successfully created", existsAfter); - aNewFile.delete(); //housekeeping logAggregationService.stop(); } - @Test - public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() - throws IOException { - this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); - Path aNewFile = new Path(String.valueOf("tmp"+System.currentTimeMillis())); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, aNewFile.getName()); - - LogAggregationService logAggregationService = new LogAggregationService( - dispatcher, this.context, this.delSrvc, super.dirsHandler); - logAggregationService.init(this.conf); - logAggregationService.start(); - - ApplicationId appId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - LogAggregationContext contextWithAMAndFailed = - Records.newRecord(LogAggregationContext.class); - contextWithAMAndFailed.setLogAggregationPolicyClassName( - AMOrFailedContainerLogAggregationPolicy.class.getName()); - logAggregationService.handle(new LogHandlerAppStartedEvent(appId, - this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); - - String targetGroup = - UserGroupInformation.getLoginUser().getPrimaryGroupName(); - FileSystem fs = FileSystem.get(this.conf); - FileStatus fileStatus = fs.getFileStatus(aNewFile); - Assert.assertEquals("The new aggregate file is not successfully created", - fileStatus.getGroup(), targetGroup); - - fs.delete(aNewFile, true); - logAggregationService.stop(); - } - - - @Test - public void testAppLogDirCreation() throws Exception { - final String inputSuffix = "logs-tfile"; - this.conf.set(YarnConfiguration.NM_LOG_DIRS, - localLogDir.getAbsolutePath()); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - this.remoteRootLogDir.getAbsolutePath()); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "logs"); - - InlineDispatcher dispatcher = new InlineDispatcher(); - dispatcher.init(this.conf); - dispatcher.start(); - - FileSystem fs = FileSystem.get(this.conf); - final FileSystem spyFs = spy(FileSystem.get(this.conf)); - - final LogAggregationTFileController spyFileFormat - = new LogAggregationTFileController() { - @Override - public FileSystem getFileSystem(Configuration conf) - throws IOException { - return spyFs; - } - }; - spyFileFormat.initialize(conf, "TFile"); - LogAggregationService aggSvc = new LogAggregationService(dispatcher, - this.context, this.delSrvc, super.dirsHandler) { - @Override - public LogAggregationFileController getLogAggregationFileController( - Configuration conf) { - return spyFileFormat; - } - }; - aggSvc.init(this.conf); - aggSvc.start(); - - // start an application and verify user, suffix, and app dirs created - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - Path userDir = fs.makeQualified(new Path( - remoteRootLogDir.getAbsolutePath(), this.user)); - Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( - new Path(remoteRootLogDir.getAbsolutePath()), - this.user, inputSuffix, appId)); - Path suffixDir = bucketDir.getParent(); - Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( - new Path(remoteRootLogDir.getAbsolutePath()), appId, - this.user, inputSuffix)); - LogAggregationContext contextWithAllContainers = - Records.newRecord(LogAggregationContext.class); - contextWithAllContainers.setLogAggregationPolicyClassName( - AllContainerLogAggregationPolicy.class.getName()); - aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, - this.acls, contextWithAllContainers)); - verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); - verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); - verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class)); - verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); - - // start another application and verify only app dir created - ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); - Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( - new Path(remoteRootLogDir.getAbsolutePath()), - appId2, this.user, inputSuffix)); - aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, - this.acls, contextWithAllContainers)); - verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); - - // start another application with the app dir already created and verify - // we do not try to create it again - ApplicationId appId3 = BuilderUtils.newApplicationId(2, 2); - Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( - new Path(remoteRootLogDir.getAbsolutePath()), - appId3, this.user, inputSuffix)); - new File(appDir3.toUri().getPath()).mkdir(); - aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, - this.acls, contextWithAllContainers)); - verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); - - - // Verify we do not create bucket dir again - ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003); - Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( - new Path(remoteRootLogDir.getAbsolutePath()), - appId4, this.user, inputSuffix)); - Path bucketDir4 = appDir4.getParent(); - new File(bucketDir4.toUri().getPath()).mkdir(); - - aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, - this.acls, contextWithAllContainers)); - verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class)); - verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class)); - - aggSvc.stop(); - aggSvc.close(); - dispatcher.stop(); - } - @Test @SuppressWarnings("unchecked") public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { @@ -893,10 +702,11 @@ public LogAggregationFileController getLogAggregationFileController( dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent(appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); + Thread.sleep(1000); assertThat(logAggregationService.getInvalidTokenApps()).hasSize(1); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM