diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 37c81ec..496e912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1677,11 +1677,35 @@ private static void addDeprecatedKeys() { public static final boolean DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO = true; - /** URI for FileSystemApplicationHistoryStore */ + /** URI for FileSystemApplicationHistoryStore. */ @Private public static final String FS_APPLICATION_HISTORY_STORE_URI = APPLICATION_HISTORY_PREFIX + "fs-history-store.uri"; + /** FileSystemApplicationHistoryStore enable ttl. */ + @Private + public static final String FS_APPLICATION_HISTORY_STORE_ENABLE_TTL = + APPLICATION_HISTORY_PREFIX + "ttl-enable"; + @Private + public static final boolean DEFAULT_FS_APPLICATION_HISTORY_STORE_ENABLE_TTL + = true; + + /** FileSystemApplicationHistoryStore ttl ms. */ + @Private + public static final String FS_APPLICATION_HISTORY_STORE_TTL_MS = + APPLICATION_HISTORY_PREFIX + "ttl-ms"; + @Private + public static final long DEFAULT_FS_APPLICATION_HISTORY_STORE_TTL_MS + = 1000 * 60 * 60 * 24; + + /** FileSystemApplicationHistoryStore ttl interval ms. */ + @Private + public static final String FS_APPLICATION_HISTORY_STORE_TTL_INTERVAL_MS = + APPLICATION_HISTORY_PREFIX + "ttl-interval-ms"; + @Private + public static final long DEFAULT_FS_APPLICATION_HISTORY_STORE_TTL_INTERVAL_MS + = 1000 * 60 * 1; + /** T-file compression types used to compress history data.*/ @Private public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java index 6d76864..50124e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto; import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto; @@ -103,6 +105,8 @@ private FileSystem fs; private Path rootDirPath; + private Thread deletionThread; + private ConcurrentMap outstandingWriters = new ConcurrentHashMap(); @@ -133,11 +137,29 @@ public void serviceStart() throws Exception { LOG.error("Error when initializing FileSystemHistoryStorage", e); throw e; } + + if (conf.getBoolean( + YarnConfiguration.FS_APPLICATION_HISTORY_STORE_ENABLE_TTL, + YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_ENABLE_TTL)) { + deletionThread = new EntityDeletionThread(conf); + deletionThread.start(); + } + super.serviceStart(); } @Override public void serviceStop() throws Exception { + if (deletionThread != null) { + deletionThread.interrupt(); + LOG.info("Waiting for deletion thread to complete its current action"); + try { + deletionThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for deletion thread to complete," + + " closing db now", e); + } + } try { for (Entry entry : outstandingWriters .entrySet()) { @@ -150,9 +172,48 @@ public void serviceStop() throws Exception { super.serviceStop(); } + private class EntityDeletionThread extends Thread { + private final long ttl; + private final long ttlInterval; + + public EntityDeletionThread(Configuration conf) { + ttl = conf.getLong( + YarnConfiguration.FS_APPLICATION_HISTORY_STORE_TTL_MS, + YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_TTL_MS); + ttlInterval = conf.getLong( + YarnConfiguration.FS_APPLICATION_HISTORY_STORE_TTL_INTERVAL_MS, + YarnConfiguration + .DEFAULT_FS_APPLICATION_HISTORY_STORE_TTL_INTERVAL_MS); + LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + + "interval " + ttlInterval); + } + + @Override + public void run() { + while (true) { + long timestamp = System.currentTimeMillis() - ttl; + try { + discardOldEntities(timestamp); + Thread.sleep(ttlInterval); + } catch (IOException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.info("Deletion thread received interrupt, exiting"); + break; + } + } + } + } + @Override public ApplicationHistoryData getApplication(ApplicationId appId) throws IOException { + return getApplication(appId, true); + } + + public ApplicationHistoryData getApplication(ApplicationId appId, + boolean enableInfoLog) + throws IOException { HistoryFileReader hfReader = getHistoryFileReader(appId); try { boolean readStartData = false; @@ -186,7 +247,10 @@ public ApplicationHistoryData getApplication(ApplicationId appId) if (!readFinishData) { LOG.warn("Finish information is missing for application " + appId); } - LOG.info("Completed reading history information of application " + appId); + if (enableInfoLog) { + LOG.info("Completed reading history information of application " + + appId); + } return historyData; } catch (IOException e) { LOG.error("Error when reading history file of application " + appId, e); @@ -792,4 +856,38 @@ public void readFields(DataInput in) throws IOException { suffix = in.readUTF(); } } + + /** + * Discards entities with finish timestamp less than or equal to the given + * timestamp. + */ + @VisibleForTesting + void discardOldEntities(long timestamp) + throws IOException, InterruptedException { + FileStatus[] files = fs.listStatus(rootDirPath); + for (FileStatus file : files) { + ApplicationId appId = + ConverterUtils.toApplicationId(file.getPath().getName()); + try { + ApplicationHistoryData historyData = getApplication(appId, false); + if (historyData != null) { + YarnApplicationState state = historyData.getYarnApplicationState(); + if (state.equals(YarnApplicationState.FAILED) + || state.equals(YarnApplicationState.FINISHED) + || state.equals(YarnApplicationState.KILLED)) { + if (historyData.getFinishTime() <= timestamp) { + fs.delete(file.getPath(), false); + LOG.info("Deleting generic history information of application " + + appId); + } + } + } + } catch (IOException e) { + // Eat the exception not to disturb the getting the next + // ApplicationHistoryData + LOG.error("History information of application " + appId + + " is not included into the result due to the exception", e); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java index c91d9f5..71c25bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java @@ -98,6 +98,15 @@ public void testReadWriteHistoryData() throws IOException { testReadHistoryData(5); } + @Test + public void testPurgeHistoryData() throws IOException, InterruptedException { + testWriteHistoryData(5); + Assert.assertEquals(5, store.getAllApplications().size()); + FileSystemApplicationHistoryStore astore = (FileSystemApplicationHistoryStore) store; + astore.discardOldEntities(0); + Assert.assertEquals(0, store.getAllApplications().size()); + } + private void testWriteHistoryData(int num) throws IOException { testWriteHistoryData(num, false, false); }