diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1824453..6f3e30c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1724,6 +1724,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT = 5*60; + public static final String + TIMELINE_SERVICE_CLIENT_TIMER_TASK_RETAIN_SECS = + TIMELINE_SERVICE_CLIENT_PREFIX + "timer-task-retain-secs"; + public static final long + TIMELINE_SERVICE_CLIENT_TIMER_TASK_RETAIN_SECS_DEFAULT = 2 * 60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index 1c295e1..2dd0bd4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -32,6 +32,9 @@ import java.util.TimerTask; import java.util.Map.Entry; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -154,8 +157,14 @@ public FileSystemTimelineWriter(Configuration conf, YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS, YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT); + long timerTaskTTL = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_TIMER_TASK_RETAIN_SECS, + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_TIMER_TASK_RETAIN_SECS_DEFAULT); + logFDsCache = - new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl); + new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl, + timerTaskTTL); this.isAppendSupported = conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); @@ -424,10 +433,8 @@ protected void updateLastModifiedTime(long updatedTime) { private Map summanyLogFDs; private Map> entityLogFDs; - private Timer flushTimer; - private FlushTimerTask flushTimerTask; - private Timer cleanInActiveFDsTimer; - private CleanInActiveFDsTask cleanInActiveFDsTask; + private Timer flushTimer = null; + private Timer cleanInActiveFDsTimer = null; private final long ttl; private final ReentrantLock domainFDLocker = new ReentrantLock(); private final ReentrantLock summaryTableLocker = new ReentrantLock(); @@ -435,27 +442,28 @@ protected void updateLastModifiedTime(long updatedTime) { private final ReentrantLock summaryTableCopyLocker = new ReentrantLock(); private final ReentrantLock entityTableCopyLocker = new ReentrantLock(); private volatile boolean serviceStopped = false; + private volatile boolean timerTaskStarted = false; + private final ReentrantLock timerTaskLocker = new ReentrantLock(); + private final long flushIntervalSecs; + private final long cleanIntervalSecs; + private final long timerTaskRetainTTL; + private volatile long timeStampOfLastWrite = System.currentTimeMillis(); + private final ReadLock timerTasksMonitorReadLock; + private final WriteLock timerTasksMonitorWriteLock; public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, - long ttl) { + long ttl, long timerTaskRetainTTL) { domainLogFD = null; summanyLogFDs = new HashMap(); entityLogFDs = new HashMap>(); - this.flushTimer = - new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", - true); - this.flushTimerTask = new FlushTimerTask(); - this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000, - flushIntervalSecs * 1000); - - this.cleanInActiveFDsTimer = - new Timer(LogFDsCache.class.getSimpleName() + - "cleanInActiveFDsTimer", true); - this.cleanInActiveFDsTask = new CleanInActiveFDsTask(); - this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask, - cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); this.ttl = ttl * 1000; + this.flushIntervalSecs = flushIntervalSecs; + this.cleanIntervalSecs = cleanIntervalSecs; + this.timerTaskRetainTTL = timerTaskRetainTTL * 1000; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.timerTasksMonitorReadLock = lock.readLock(); + this.timerTasksMonitorWriteLock = lock.writeLock(); } @Override @@ -623,13 +631,58 @@ public void run() { } } + private class TimerMonitorTask extends TimerTask { + @Override + public void run() { + try { + timerTasksMonitorWriteLock.lock(); + monitorTimerTasks(); + } finally { + timerTasksMonitorWriteLock.unlock(); + } + } + } + + private void monitorTimerTasks() { + if (System.currentTimeMillis() - this.timeStampOfLastWrite + >= this.timerTaskRetainTTL) { + if (flushTimer != null) { + flushTimer.cancel(); + flushTimer = null; + } + + if (cleanInActiveFDsTimer != null) { + cleanInActiveFDsTimer.cancel(); + cleanInActiveFDsTimer = null; + } + + closeSummaryFDs(summanyLogFDs); + + closeEntityFDs(entityLogFDs); + + timerTaskStarted = false; + } else { + if (this.cleanInActiveFDsTimer != null) { + this.cleanInActiveFDsTimer.schedule(new TimerMonitorTask(), + this.timerTaskRetainTTL); + } + } + } + @Override public void close() throws IOException { serviceStopped = true; - flushTimer.cancel(); - cleanInActiveFDsTimer.cancel(); + if (flushTimer != null) { + flushTimer.cancel(); + flushTimer = null; + } + + if (cleanInActiveFDsTimer != null) { + cleanInActiveFDsTimer.cancel(); + cleanInActiveFDsTimer = null; + } try { this.domainFDLocker.lock(); @@ -696,6 +749,7 @@ private void closeSummaryFDs( public void writeDomainLog(FileSystem fs, Path logPath, ObjectMapper objMapper, TimelineDomain domain, boolean isAppendSupported) throws IOException { + checkAndStartTimeTasks(); try { this.domainFDLocker.lock(); if (this.domainLogFD != null) { @@ -714,6 +768,7 @@ public void writeEntityLogs(FileSystem fs, Path entityLogPath, ObjectMapper objMapper, ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, List entitiesToEntity, boolean isAppendSupported) throws IOException{ + checkAndStartTimeTasks(); writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); } @@ -788,6 +843,7 @@ public void writeSummaryEntityLogs(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, List entities, boolean isAppendSupported) throws IOException { + checkAndStartTimeTasks(); writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities, isAppendSupported, this.summanyLogFDs); } @@ -843,5 +899,42 @@ private void createSummaryFDAndWrite(FileSystem fs, Path logPath, summaryTableLocker.unlock(); } } + + private void createAndStartTimerTasks() { + this.flushTimer = + new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", + true); + this.flushTimer.schedule(new FlushTimerTask(), flushIntervalSecs * 1000, + flushIntervalSecs * 1000); + + this.cleanInActiveFDsTimer = + new Timer(LogFDsCache.class.getSimpleName() + + "cleanInActiveFDsTimer", true); + this.cleanInActiveFDsTimer.schedule(new CleanInActiveFDsTask(), + cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); + + this.cleanInActiveFDsTimer.schedule(new TimerMonitorTask(), + this.timerTaskRetainTTL); + } + + private void checkAndStartTimeTasks() { + try { + this.timerTasksMonitorReadLock.lock(); + this.timeStampOfLastWrite = System.currentTimeMillis(); + if(!timerTaskStarted) { + try { + timerTaskLocker.lock(); + if (!timerTaskStarted) { + createAndStartTimerTasks(); + timerTaskStarted = true; + } + } finally { + timerTaskLocker.unlock(); + } + } + } finally { + this.timerTasksMonitorReadLock.unlock(); + } + } } }