diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 93437e3..6b806d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2066,6 +2066,10 @@ public static boolean isAclEnabled(Configuration conf) { = TIMELINE_SERVICE_PREFIX + "entity-file.fs-support-append"; + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_KEEP_UNDER_USER_DIR = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "keep-under-user-dir"; + /** * Settings for timeline service v2.0 */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index fc3385b..3f33611 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -145,9 +145,12 @@ public FileSystemTimelineWriter(Configuration conf, new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl, timerTaskTTL); - this.isAppendSupported = - conf.getBoolean( - YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); + this.isAppendSupported = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); + + boolean storeInsideUserDir = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_KEEP_UNDER_USER_DIR, + false); objMapper = createObjectMapper(); @@ -157,8 +160,8 @@ public FileSystemTimelineWriter(Configuration conf, YarnConfiguration .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE); - attemptDirCache = - new AttemptDirCache(attemptDirCacheSize, fs, activePath); + attemptDirCache = new AttemptDirCache(attemptDirCacheSize, fs, activePath, + authUgi, storeInsideUserDir); if (LOG.isDebugEnabled()) { StringBuilder debugMSG = new StringBuilder(); @@ -169,7 +172,7 @@ public FileSystemTimelineWriter(Configuration conf, + "=" + cleanIntervalSecs + ", " + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + "=" + ttl + ", " + - YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_KEEP_UNDER_USER_DIR + "=" + isAppendSupported + ", " + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR + "=" + activePath); @@ -946,8 +949,11 @@ private void checkAndStartTimeTasks() { private final Map attemptDirCache; private final FileSystem fs; private final Path activePath; + private final UserGroupInformation authUgi; + private final boolean storeInsideUserDir; - public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) { + public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath, + UserGroupInformation ugi, boolean storeInsideUserDir) { this.attemptDirCacheSize = cacheSize; this.attemptDirCache = new LinkedHashMap( @@ -961,6 +967,8 @@ protected boolean removeEldestEntry( }; this.fs = fs; this.activePath = activePath; + this.authUgi = ugi; + this.storeInsideUserDir = storeInsideUserDir; } public Path getAppAttemptDir(ApplicationAttemptId attemptId) @@ -993,8 +1001,8 @@ private Path createAttemptDir(ApplicationAttemptId appAttemptId) } private Path createApplicationDir(ApplicationId appId) throws IOException { - Path appDir = - new Path(activePath, appId.toString()); + Path userDir = createUserDir(authUgi.getShortUserName()); + Path appDir = new Path(userDir, appId.toString()); if (FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS))) { if (LOG.isDebugEnabled()) { @@ -1003,5 +1011,19 @@ private Path createApplicationDir(ApplicationId appId) throws IOException { } return appDir; } + + private Path createUserDir(String user) throws IOException { + if (!storeInsideUserDir) { + return activePath; + } + Path userDir = new Path(activePath, user); + if (FileSystem.mkdirs(fs, userDir, + new FsPermission(APP_LOG_DIR_PERMISSIONS))) { + if (LOG.isDebugEnabled()) { + LOG.debug("New user directory created - " + userDir); + } + } + return userDir; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java index d3826e1..1096184 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java @@ -59,25 +59,30 @@ private static FileContext localFS; private static File localActiveDir; private TimelineWriter spyTimelineWriter; + private UserGroupInformation authUgi; @Before public void setup() throws Exception { localFS = FileContext.getLocalFSFileContext(); localActiveDir = new File("target", this.getClass().getSimpleName() + "-activeDir") - .getAbsoluteFile(); + .getAbsoluteFile(); localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); localActiveDir.mkdir(); LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath()); + authUgi = UserGroupInformation.getCurrentUser(); + } + + private YarnConfiguration getConfigurations() { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f); conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, - localActiveDir.getAbsolutePath()); + localActiveDir.getAbsolutePath()); conf.set( - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, - "summary_type"); - client = createTimelineClient(conf); + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, + "summary_type"); + return conf; } @After @@ -90,6 +95,21 @@ public void tearDown() throws Exception { @Test public void testPostEntities() throws Exception { + client = createTimelineClient(getConfigurations()); + verifyForPostEntities(false); + } + + @Test + public void testPostEntitiesToKeepUnderUserDir() throws Exception { + YarnConfiguration conf = getConfigurations(); + conf.setBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_KEEP_UNDER_USER_DIR, + true); + client = createTimelineClient(conf); + verifyForPostEntities(true); + } + + private void verifyForPostEntities(boolean storeInsideUserDir) { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); TimelineEntityGroupId groupId = @@ -118,7 +138,8 @@ public void testPostEntities() throws Exception { entityTDB[0] = entities[0]; verify(spyTimelineWriter, times(1)).putEntities(entityTDB); Assert.assertTrue(localFS.util().exists( - new Path(getAppAttemptDir(attemptId1), "summarylog-" + new Path(getAppAttemptDir(attemptId1, storeInsideUserDir), + "summarylog-" + attemptId1.toString()))); reset(spyTimelineWriter); @@ -132,13 +153,16 @@ public void testPostEntities() throws Exception { verify(spyTimelineWriter, times(0)).putEntities( any(TimelineEntity[].class)); Assert.assertTrue(localFS.util().exists( - new Path(getAppAttemptDir(attemptId2), "summarylog-" + new Path(getAppAttemptDir(attemptId2, storeInsideUserDir), + "summarylog-" + attemptId2.toString()))); Assert.assertTrue(localFS.util().exists( - new Path(getAppAttemptDir(attemptId2), "entitylog-" + new Path(getAppAttemptDir(attemptId2, storeInsideUserDir), + "entitylog-" + groupId.toString()))); Assert.assertTrue(localFS.util().exists( - new Path(getAppAttemptDir(attemptId2), "entitylog-" + new Path(getAppAttemptDir(attemptId2, storeInsideUserDir), + "entitylog-" + groupId2.toString()))); reset(spyTimelineWriter); } catch (Exception e) { @@ -148,6 +172,21 @@ public void testPostEntities() throws Exception { @Test public void testPutDomain() { + client = createTimelineClient(getConfigurations()); + verifyForPutDomain(false); + } + + @Test + public void testPutDomainToKeepUnderUserDir() { + YarnConfiguration conf = getConfigurations(); + conf.setBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_KEEP_UNDER_USER_DIR, + true); + client = createTimelineClient(conf); + verifyForPutDomain(true); + } + + private void verifyForPutDomain(boolean storeInsideUserDir) { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId attemptId1 = @@ -161,23 +200,33 @@ public void testPutDomain() { client.putDomain(attemptId1, domain); verify(spyTimelineWriter, times(0)).putDomain(domain); - Assert.assertTrue(localFS.util().exists( - new Path(getAppAttemptDir(attemptId1), "domainlog-" - + attemptId1.toString()))); + Assert.assertTrue(localFS.util() + .exists(new Path(getAppAttemptDir(attemptId1, storeInsideUserDir), + "domainlog-" + attemptId1.toString()))); reset(spyTimelineWriter); } catch (Exception e) { Assert.fail("Exception is not expected." + e); } } - private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) { - Path appDir = - new Path(localActiveDir.getAbsolutePath(), appAttemptId - .getApplicationId().toString()); + private Path getAppAttemptDir(ApplicationAttemptId appAttemptId, + boolean storeInsideUserDir) { + Path userDir = getUserDir(appAttemptId, storeInsideUserDir); + Path appDir = new Path(userDir, appAttemptId.getApplicationId().toString()); Path attemptDir = new Path(appDir, appAttemptId.toString()); return attemptDir; } + private Path getUserDir(ApplicationAttemptId appAttemptId, + boolean storeInsideUserDir) { + if (!storeInsideUserDir) { + return new Path(localActiveDir.getAbsolutePath()); + } + Path userDir = + new Path(localActiveDir.getAbsolutePath(), authUgi.getShortUserName()); + return userDir; + } + private static TimelineEntity generateEntity(String type) { TimelineEntity entity = new TimelineEntity(); entity.setEntityId("entity id"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 1675a48..135848a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -356,22 +356,28 @@ protected void serviceStop() throws Exception { @VisibleForTesting int scanActiveLogs() throws IOException { long startTime = Time.monotonicNow(); - RemoteIterator iter = list(activeRootPath); + int logsToScanCount = scanActiveLogs(activeRootPath); + metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime); + return logsToScanCount; + } + + int scanActiveLogs(Path dir) throws IOException { + RemoteIterator iter = list(dir); int logsToScanCount = 0; while (iter.hasNext()) { FileStatus stat = iter.next(); String name = stat.getPath().getName(); ApplicationId appId = parseApplicationId(name); + // assumption is user directory is exist. if (appId != null) { LOG.debug("scan logs for {} in {}", appId, stat.getPath()); logsToScanCount++; AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); executor.execute(new ActiveLogParser(logs)); } else { - LOG.debug("Unable to parse entry {}", name); + logsToScanCount += scanActiveLogs(stat.getPath()); } } - metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime); return logsToScanCount; } @@ -418,6 +424,18 @@ private AppLogs getAndSetAppLogs(ApplicationId applicationId) appDirPath = getActiveAppPath(applicationId); if (fs.exists(appDirPath)) { appState = AppState.ACTIVE; + } else { + // check for user directory inside active path + RemoteIterator iter = list(activeRootPath); + while (iter.hasNext()) { + Path child = new Path(iter.next().getPath().getName(), + applicationId.toString()); + appDirPath = new Path(activeRootPath, child); + if (fs.exists(appDirPath)) { + appState = AppState.ACTIVE; + break; + } + } } } if (appState != AppState.UNKNOWN) {