diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index febf095..9e950b0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -751,6 +751,19 @@
YARN_PREFIX + "app.container.log.filesize";
////////////////////////////////
+ // AHS Configs
+ ////////////////////////////////
+
+ public static final String AHS_PREFIX = YARN_PREFIX + "ahs.";
+
+ /** URI for FileSystemApplicationHistoryStore */
+ public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri";
+
+ /** T-file compression types used to compress history data.*/
+ public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type";
+ public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none";
+
+ ////////////////////////////////
// Other Configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index ab8d50a..f8979dd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -755,6 +755,23 @@
$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
+
+
+
+ URI pointing to the location of the FileSystem path where
+ the history will be persisted. This must be supplied when using
+ org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
+ as the value for yarn.resourcemanager.ahs.writer.class
+ yarn.ahs.fs-history-store.uri
+ ${hadoop.log.dir}/yarn/system/ahstore
+
+
+
+ T-file compression types used to compress history data.
+ yarn.ahs.fs-history-store.compression-type
+ none
+
+
The interval of the yarn client's querying application state
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 33350b4..e3d9bd9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -40,5 +40,11 @@
org.apache.hadoop
hadoop-yarn-server-common
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..2b94c6e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,862 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+ implements ApplicationHistoryStore {
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemApplicationHistoryStore.class);
+
+ private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ private static final int MIN_BLOCK_SIZE = 256 * 1024;
+ private static final String START_DATA_SUFFIX = "_start";
+ private static final String FINISH_DATA_SUFFIX = "_finish";
+ private static final FsPermission ROOT_DIR_UMASK =
+ FsPermission.createImmutable((short) 0740);
+ private static final FsPermission HISTORY_FILE_UMASK =
+ FsPermission.createImmutable((short) 0640);
+
+ private FileSystem fs;
+ private Path rootDirPath;
+
+ private ConcurrentMap outstandingWriters =
+ new ConcurrentHashMap();
+
+ public FileSystemApplicationHistoryStore() {
+ super(FileSystemApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath = new Path(
+ conf.get(YarnConfiguration.FS_HISTORY_STORE_URI));
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ try {
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(rootDirPath);
+ fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+ } catch (IOException e) {
+ LOG.error("Error when initializing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ try {
+ boolean flag = false;
+ for (Entry entry : outstandingWriters
+ .entrySet()) {
+ try {
+ entry.getValue().close();
+ LOG.info("Closed history file of application " + entry.getKey());
+ } catch (IOException e) {
+ LOG.error("Error when closing history file of application "
+ + entry.getKey());
+ flag = true;
+ }
+ }
+ if (flag) {
+ throw new IOException("Error when closing history files");
+ }
+ outstandingWriters.clear();
+ } finally {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Error when stopping FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ if (hfReader == null) {
+ return null;
+ }
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationHistoryData historyData =
+ ApplicationHistoryData.newInstance(
+ appId, null, null, null, null, Long.MIN_VALUE, Long.MIN_VALUE,
+ Long.MAX_VALUE, null, FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.equals(appId + START_DATA_SUFFIX)) {
+ ApplicationStartData startData =
+ new ApplicationStartDataPBImpl(
+ ApplicationStartDataProto.parseFrom(entry.value));
+ mergeApplicationHistoryData(historyData, startData, null);
+ readStartData = false;
+ } else if (entry.key.equals(appId + FINISH_DATA_SUFFIX)) {
+ ApplicationFinishData finishData =
+ new ApplicationFinishDataPBImpl(
+ ApplicationFinishDataProto.parseFrom(entry.value));
+ mergeApplicationHistoryData(historyData, null, finishData);
+ readFinishData = true;
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application " + appId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application " + appId);
+ }
+ LOG.info("Completed reading history information of application " + appId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application " + appId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public Map getAllApplications()
+ throws IOException {
+ Map historyDataMap =
+ new HashMap();
+ FileStatus[] files = fs.listStatus(rootDirPath);
+ for (FileStatus file : files) {
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(file.getPath().getName());
+ try {
+ ApplicationHistoryData historyData = getApplication(appId);
+ if (historyData != null) {
+ historyDataMap.put(appId, historyData);
+ }
+ } catch (IOException e) {
+ // Eat the exception not to disturb the getting the next
+ // ApplicationHistoryData
+ LOG.error("History information of application " + appId
+ + " is not included into the result due to the exception", e);
+ }
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public Map
+ getApplicationAttempts(ApplicationId appId) throws IOException {
+ Map historyDataMap =
+ new HashMap();
+ Map> startFinshDataMap =
+ new HashMap>();
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ if (hfReader == null) {
+ return historyDataMap;
+ }
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.startsWith(ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+ if (entry.key.endsWith(START_DATA_SUFFIX)) {
+ retriveStartFinishData(appId, entry, startFinshDataMap, true);
+ } else {
+ retriveStartFinishData(appId, entry, startFinshDataMap, false);
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all application"
+ + " attempts of application " + appId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some application"
+ + " attempts of application " + appId);
+ } finally {
+ hfReader.close();
+ }
+ for (Map.Entry> entry : startFinshDataMap
+ .entrySet()) {
+ ApplicationAttemptHistoryData historyData =
+ ApplicationAttemptHistoryData.newInstance(
+ entry.getKey(), null, -1, null, null, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ mergeApplicationAttemptHistoryData(historyData,
+ entry.getValue().startData, entry.getValue().finishDtata);
+ historyDataMap.put(entry.getKey(), historyData);
+ }
+ return historyDataMap;
+ }
+
+ private
+ void
+ retriveStartFinishData(
+ ApplicationId appId,
+ HistoryFileReader.Entry entry,
+ Map> startFinshDataMap,
+ boolean start) throws IOException {
+ ApplicationAttemptId appAttemptId =
+ ConverterUtils.toApplicationAttemptId(entry.key.substring(
+ 0,
+ entry.key.length()
+ - (start ? START_DATA_SUFFIX.length() : FINISH_DATA_SUFFIX
+ .length())));
+ if (appAttemptId.getApplicationId().equals(appId)) {
+ StartFinishDataPair pair =
+ startFinshDataMap.get(appAttemptId);
+ if (pair == null) {
+ pair =
+ new StartFinishDataPair();
+ startFinshDataMap.put(appAttemptId, pair);
+ }
+ if (start) {
+ ApplicationAttemptStartData startData =
+ new ApplicationAttemptStartDataPBImpl(
+ ApplicationAttemptStartDataProto.parseFrom(entry.value));
+ pair.startData = startData;
+ } else {
+ ApplicationAttemptFinishData finishData =
+ new ApplicationAttemptFinishDataPBImpl(
+ ApplicationAttemptFinishDataProto.parseFrom(entry.value));
+ pair.finishDtata = finishData;
+ }
+ }
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ if (hfReader == null) {
+ return null;
+ }
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationAttemptHistoryData historyData =
+ ApplicationAttemptHistoryData.newInstance(
+ appAttemptId, null, -1, null, null, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.equals(appAttemptId + START_DATA_SUFFIX)) {
+ ApplicationAttemptStartData startData =
+ new ApplicationAttemptStartDataPBImpl(
+ ApplicationAttemptStartDataProto.parseFrom(entry.value));
+ mergeApplicationAttemptHistoryData(historyData, startData, null);
+ readStartData = true;
+ } else if (entry.key.equals(appAttemptId + FINISH_DATA_SUFFIX)) {
+ ApplicationAttemptFinishData finishData =
+ new ApplicationAttemptFinishDataPBImpl(
+ ApplicationAttemptFinishDataProto.parseFrom(entry.value));
+ mergeApplicationAttemptHistoryData(historyData, null, finishData);
+ readFinishData = true;
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application attempt "
+ + appAttemptId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application attempt "
+ + appAttemptId);
+ }
+ LOG.info("Completed reading history information of application attempt "
+ + appAttemptId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application attempt"
+ + appAttemptId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (hfReader == null) {
+ return null;
+ }
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ContainerHistoryData historyData =
+ ContainerHistoryData.newInstance(containerId, null, null, null,
+ Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
+ null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.equals(containerId + START_DATA_SUFFIX)) {
+ ContainerStartData startData =
+ new ContainerStartDataPBImpl(
+ ContainerStartDataProto.parseFrom(entry.value));
+ mergeContainerHistoryData(historyData, startData, null);
+ readStartData = false;
+ } else if (entry.key.equals(containerId + FINISH_DATA_SUFFIX)) {
+ ContainerFinishData finishData =
+ new ContainerFinishDataPBImpl(
+ ContainerFinishDataProto.parseFrom(entry.value));
+ mergeContainerHistoryData(historyData, null, finishData);
+ readFinishData = true;
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for container " + containerId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for container " + containerId);
+ }
+ LOG.info("Completed reading history information of container "
+ + containerId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of container " + containerId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationAttemptHistoryData attemptHistoryData =
+ getApplicationAttempt(appAttemptId);
+ if (attemptHistoryData == null
+ || attemptHistoryData.getMasterContainerId() == null) {
+ return null;
+ }
+ return getContainer(attemptHistoryData.getMasterContainerId());
+ }
+
+ @Override
+ public Map getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ Map historyDataMap =
+ new HashMap();
+ Map> startFinshDataMap =
+ new HashMap>();
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ if (hfReader == null) {
+ return historyDataMap;
+ }
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+ if (entry.key.endsWith(START_DATA_SUFFIX)) {
+ retriveStartFinishData(appAttemptId, entry, startFinshDataMap, true);
+ } else {
+ retriveStartFinishData(appAttemptId, entry, startFinshDataMap,
+ false);
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all conatiners"
+ + " of application attempt " + appAttemptId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some containers"
+ + " of application attempt " + appAttemptId);
+ } finally {
+ hfReader.close();
+ }
+ for (Map.Entry> entry : startFinshDataMap
+ .entrySet()) {
+ ContainerHistoryData historyData =
+ ContainerHistoryData.newInstance(entry.getKey(), null, null, null,
+ Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
+ null);
+ mergeContainerHistoryData(historyData, entry.getValue().startData,
+ entry.getValue().finishDtata);
+ historyDataMap.put(entry.getKey(), historyData);
+ }
+ return historyDataMap;
+ }
+
+ private
+ void
+ retriveStartFinishData(
+ ApplicationAttemptId appAttemptId,
+ HistoryFileReader.Entry entry,
+ Map> startFinshDataMap,
+ boolean start) throws IOException {
+ ContainerId containerId =
+ ConverterUtils.toContainerId(entry.key.substring(
+ 0,
+ entry.key.length()
+ - (start ? START_DATA_SUFFIX.length() : FINISH_DATA_SUFFIX
+ .length())));
+ if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+ StartFinishDataPair pair =
+ startFinshDataMap.get(containerId);
+ if (pair == null) {
+ pair =
+ new StartFinishDataPair();
+ startFinshDataMap.put(containerId, pair);
+ }
+ if (start) {
+ ContainerStartData startData = new ContainerStartDataPBImpl(
+ ContainerStartDataProto.parseFrom(entry.value));
+ pair.startData = startData;
+ } else {
+ ContainerFinishData finishData = new ContainerFinishDataPBImpl(
+ ContainerFinishDataProto.parseFrom(entry.value));
+ pair.finishDtata = finishData;
+ }
+ }
+ }
+
+ @Override
+ public void applicationStarted(ApplicationStartData appStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ outstandingWriters.get(appStart.getApplicationId());
+ if (hfWriter == null) {
+ Path applicationHistoryFile =
+ new Path(rootDirPath, appStart.getApplicationId().toString());
+ try {
+ hfWriter = new HistoryFileWriter(applicationHistoryFile);
+ LOG.info("Opened history file of application "
+ + appStart.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when openning history file of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+ } else {
+ throw new IOException("History file of application "
+ + appStart.getApplicationId() + " is already opened");
+ }
+ assert appStart instanceof ApplicationStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(appStart.getApplicationId().toString()
+ + START_DATA_SUFFIX, ((ApplicationStartDataPBImpl) appStart)
+ .getProto().toByteArray());
+ LOG.info("Start information of application "
+ + appStart.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationFinished(ApplicationFinishData appFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appFinish.getApplicationId());
+ assert appFinish instanceof ApplicationFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(
+ appFinish.getApplicationId() + FINISH_DATA_SUFFIX,
+ ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+ LOG.info("Finish information of application "
+ + appFinish.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application "
+ + appFinish.getApplicationId());
+ throw e;
+ }
+ // Not put close() in finally block in case callers want to retry writing
+ // the data. On the other hand, the file will anyway be close when the
+ // store is stopped.
+ try {
+ hfWriter.close();
+ LOG.info("Closed history file of application "
+ + appFinish.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when closing history file of application "
+ + appFinish.getApplicationId());
+ throw e;
+ }
+ outstandingWriters.remove(appFinish.getApplicationId());
+ }
+
+ @Override
+ public void applicationAttemptStarted(
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(
+ appAttemptStart.getApplicationAttemptId() + START_DATA_SUFFIX,
+ ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+ .toByteArray());
+ LOG.info("Start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationAttemptFinished(
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(
+ appAttemptFinish.getApplicationAttemptId() + FINISH_DATA_SUFFIX,
+ ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+ .toByteArray());
+ LOG.info("Finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerStarted(ContainerStartData containerStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerStart.getContainerId()
+ .getApplicationAttemptId()
+ .getApplicationId());
+ assert containerStart instanceof ContainerStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(
+ containerStart.getContainerId() + START_DATA_SUFFIX,
+ ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+ LOG.info("Start information of container "
+ + containerStart.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of container "
+ + containerStart.getContainerId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerFinished(ContainerFinishData containerFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerFinish.getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ assert containerFinish instanceof ContainerFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(
+ containerFinish.getContainerId() + FINISH_DATA_SUFFIX,
+ ((ContainerFinishDataPBImpl) containerFinish).getProto()
+ .toByteArray());
+ LOG.info("Finish information of container "
+ + containerFinish.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of container "
+ + containerFinish.getContainerId());
+ }
+ }
+
+ private static void mergeApplicationHistoryData(
+ ApplicationHistoryData historyData,
+ ApplicationStartData startData,
+ ApplicationFinishData finishData) {
+ if (startData != null) {
+ historyData.setApplicationName(startData.getApplicationName());
+ historyData.setApplicationType(startData.getApplicationType());
+ historyData.setQueue(startData.getQueue());
+ historyData.setUser(startData.getUser());
+ historyData.setSubmitTime(startData.getSubmitTime());
+ historyData.setStartTime(startData.getStartTime());
+ }
+ if (finishData != null) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+ }
+ }
+
+ private static void mergeApplicationAttemptHistoryData(
+ ApplicationAttemptHistoryData historyData,
+ ApplicationAttemptStartData startData,
+ ApplicationAttemptFinishData finishData) {
+ if (startData != null) {
+ historyData.setHost(startData.getHost());
+ historyData.setRPCPort(startData.getRPCPort());
+ historyData.setMasterContainerId(startData.getMasterContainerId());
+ }
+ if (finishData != null) {
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setTrackingURL(finishData.getTrackingURL());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationAttemptState(finishData
+ .getYarnApplicationAttemptState());
+ }
+ }
+
+ private static void mergeContainerHistoryData(
+ ContainerHistoryData historyData, ContainerStartData startData,
+ ContainerFinishData finishData) {
+ if (startData != null) {
+ historyData.setAllocatedResource(startData.getAllocatedResource());
+ historyData.setAssignedNode(startData.getAssignedNode());
+ historyData.setPriority(startData.getPriority());
+ historyData.setStartTime(startData.getStartTime());
+ }
+ if (finishData != null) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setLogURL(finishData.getLogURL());
+ historyData.setContainerExitStatus(finishData
+ .getContainerExitStatus());
+ historyData.setContainerState(finishData.getContainerState());
+ }
+ }
+
+ private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+ throws IOException {
+ HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+ if (hfWriter == null) {
+ throw new IOException("History file of application " + appId
+ + " is not opened");
+ }
+ return hfWriter;
+ }
+
+ private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+ throws IOException {
+ Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+ if (!fs.exists(applicationHistoryFile)) {
+ return null;
+ }
+ // The history file is still under writing
+ if (outstandingWriters.containsKey(appId)) {
+ return null;
+ }
+ return new HistoryFileReader(applicationHistoryFile);
+ }
+
+ private class HistoryFileReader {
+
+ private class Entry {
+
+ private String key;
+ private byte[] value;
+
+ public Entry(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ private FSDataInputStream fsdis;
+ private TFile.Reader reader;
+ private TFile.Reader.Scanner scanner;
+
+ public HistoryFileReader(Path historyFile) throws IOException {
+ FSDataInputStream fsdis = fs.open(historyFile);
+ reader =
+ new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+ getConfig());
+ reset();
+ }
+
+ public boolean hasNext() {
+ return !scanner.atEnd();
+ }
+
+ public Entry next() throws IOException {
+ TFile.Reader.Scanner.Entry entry = scanner.entry();
+ DataInputStream dis = entry.getKeyStream();
+ String key = dis.readUTF();
+ dis = entry.getValueStream();
+ byte[] value = new byte[entry.getValueLength()];
+ dis.read(value);
+ scanner.advance();
+ return new Entry(key, value);
+ }
+
+ public void reset() throws IOException {
+ if (scanner != null) {
+ scanner.close();
+ }
+ scanner = reader.createScanner();
+ }
+
+ public void close() throws IOException {
+ try {
+ try {
+ if (scanner != null) {
+ scanner.close();
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } finally {
+ if (fsdis != null) {
+ fsdis.close();
+ }
+ }
+ }
+ }
+
+ private class HistoryFileWriter {
+
+ private FSDataOutputStream fsdos;
+ private TFile.Writer writer;
+
+ public HistoryFileWriter(Path historyFile)
+ throws IOException {
+ if (fs.exists(historyFile)) {
+ LOG.warn(historyFile.toString() + " already exists");
+ }
+ fsdos = fs.create(historyFile, true);
+ fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+ writer =
+ new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+ YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE),
+ null, getConfig());
+ }
+
+ public synchronized void close() throws IOException {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } finally {
+ if (fsdos != null) {
+ fsdos.close();
+ }
+ }
+ }
+
+ public synchronized void writeHistoryData(String key, byte[] value)
+ throws IOException {
+ DataOutputStream dos = null;
+ try {
+ dos = writer.prepareAppendKey(-1);
+ dos.writeUTF(key);
+ } finally {
+ if (dos != null) {
+ dos.close();
+ }
+ }
+ try {
+ dos = writer.prepareAppendValue(value.length);
+ dos.write(value);
+ } finally {
+ if (dos != null) {
+ dos.close();
+ }
+ }
+ }
+
+ }
+
+ private static class StartFinishDataPair {
+
+ private S startData;
+ private F finishDtata;
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..5e14688
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileSystemApplicationHistoryStore extends
+ ApplicationHistoryStoreTestUtils {
+
+ private static FileSystem fs;
+ private static Path fsWorkingPath;
+ private static Configuration conf;
+ private static FileSystemApplicationHistoryStore fsstore;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ fs = new RawLocalFileSystem();
+ conf = new Configuration();
+ fs.initialize(new URI("/"), conf);
+ fsWorkingPath = new Path("Test");
+ fs.delete(fsWorkingPath, true);
+ conf.set(YarnConfiguration.FS_HISTORY_STORE_URI, fsWorkingPath.toString());
+ fsstore = new FileSystemApplicationHistoryStore();
+ fsstore.init(conf);
+ fsstore.start();
+ }
+
+ @AfterClass
+ public static void classTearDown() throws Exception {
+ fsstore.stop();
+ fs.delete(fsWorkingPath, true);
+ fs.close();
+ }
+
+ @Before
+ public void setup() {
+ store = fsstore;
+ }
+
+ // The order of the test cases matters
+ @Test
+ public void testWriteHistoryData() throws IOException {
+ int num = 5;
+ // write application start data
+ for (int i = 1; i <= num; ++i) {
+ ApplicationId appId = ApplicationId.newInstance(0, i);
+ writeApplicationStartData(appId);
+ }
+ // write application attempt history data
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ for (int i = 1; i <= num; ++i) {
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, i);
+ writeApplicationAttemptStartData(appAttemptId);
+ writeApplicationAttemptFinishData(appAttemptId);
+ }
+ // write container history data
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ for (int i = 1; i <= num; ++i) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
+ writeContainerStartData(containerId);
+ writeContainerFinishData(containerId);
+ }
+ // write application finish data
+ for (int i = 1; i <= num; ++i) {
+ appId = ApplicationId.newInstance(0, i);
+ writeApplicationFinishData(appId);
+ }
+ }
+
+ @Test
+ public void testReadHistoryData() throws IOException {
+ int num = 5;
+ // read application history data
+ Assert.assertEquals(num, store.getAllApplications().size());
+ for (int i = 1; i <= num; ++i) {
+ ApplicationId appId = ApplicationId.newInstance(0, i);
+ ApplicationHistoryData data = store.getApplication(appId);
+ Assert.assertNotNull(data);
+ Assert.assertEquals(appId.toString(), data.getApplicationName());
+ Assert.assertEquals(appId.toString(), data.getDiagnosticsInfo());
+ }
+ // read application attempt history data
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ Assert.assertEquals(
+ num, store.getApplicationAttempts(appId).size());
+ for (int i = 1; i <= num; ++i) {
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, i);
+ ApplicationAttemptHistoryData data =
+ store.getApplicationAttempt(appAttemptId);
+ Assert.assertNotNull(data);
+ Assert.assertEquals(appAttemptId.toString(), data.getHost());
+ Assert.assertEquals(appAttemptId.toString(), data.getDiagnosticsInfo());
+ }
+ // read container history data
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ Assert.assertEquals(
+ num, store.getContainers(appAttemptId).size());
+ for (int i = 1; i <= num; ++i) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
+ ContainerHistoryData data = store.getContainer(containerId);
+ Assert.assertNotNull(data);
+ Assert.assertEquals(Priority.newInstance(containerId.getId()),
+ data.getPriority());
+ Assert.assertEquals(containerId.toString(), data.getDiagnosticsInfo());
+ }
+ ContainerHistoryData masterContainer =
+ store.getAMContainer(appAttemptId);
+ Assert.assertNotNull(masterContainer);
+ Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1),
+ masterContainer.getContainerId());
+ }
+
+ @Test
+ public void testWriteAfterApplicationFinish() throws IOException {
+ ApplicationId appId =
+ ApplicationId.newInstance(0, store.getAllApplications().size() + 1);
+ writeApplicationStartData(appId);
+ writeApplicationFinishData(appId);
+ // write application attempt history data
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ try {
+ writeApplicationAttemptStartData(appAttemptId);
+ Assert.fail();
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("is not opened"));
+ }
+ try {
+ writeApplicationAttemptFinishData(appAttemptId);
+ Assert.fail();
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("is not opened"));
+ }
+ // write container history data
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+ try {
+ writeContainerStartData(containerId);
+ Assert.fail();
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("is not opened"));
+ }
+ try {
+ writeContainerFinishData(containerId);
+ Assert.fail();
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("is not opened"));
+ }
+ }
+
+ @Test
+ public void testMassiveWriteContainerHistoryData() throws IOException {
+ long mb = 1024 * 1024;
+ long usedDiskBefore = fs.getContentSummary(fsWorkingPath).getLength() / mb;
+ ApplicationId appId = ApplicationId.newInstance(0, 7);
+ writeApplicationStartData(appId);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ for (int i = 1; i <= 100000; ++i) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
+ writeContainerStartData(containerId);
+ writeContainerFinishData(containerId);
+ }
+ writeApplicationFinishData(appId);
+ long usedDiskAfter = fs.getContentSummary(fsWorkingPath).getLength() / mb;
+ Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20);
+ }
+
+}