diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 44c35c3..73b6a6c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -261,6 +261,10 @@
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.rm-state-store.uri";
+ /** URI for FileSystemHistoryStorage */
+ public static final String FS_HISTORY_STORAGE_URI = YARN_PREFIX +
+ "ahs.fs-history-storage.uri";
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b6753bc..f460707 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -255,6 +255,15 @@
+ URI pointing to the location of the FileSystem path where
+ the history will be persisted. This must be supplied when using
+ org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemHistoryStorage
+ as the value for yarn.resourcemanager.ahs.writer.class
+ yarn.ahs.fs-history-storage.uri
+ ${hadoop.tmp.dir}/yarn/system/ahstore
+
+
+
The maximum number of completed applications RM keeps.
yarn.resourcemanager.max-completed-applications
10000
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java
new file mode 100644
index 0000000..6f62a39
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerHistoryDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+
+public class FileSystemHistoryStorage extends AbstractService
+ implements ApplicationHistoryReader, ApplicationHistoryWriter {
+
+ private static final Log LOG = LogFactory.getLog(FileSystemHistoryStorage.class);
+
+ private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ private static final String APP_HISTORY_DATA_ROOT = "ApplicationRoot";
+ private static final String APP_ATTEMPT_HISTORY_DATA_ROOT = "ApplicationAttemptRoot";
+ private static final String CONTAINER_HISTORY_DATA_ROOT = "ContainerRoot";
+
+ private FileSystem fs;
+
+ private Path rootDirPath;
+ private Path appHistoryDataRoot;
+ private Path appAttemptHistoryDataRoot;
+ private Path containerHistoryDataRoot;
+
+ public FileSystemHistoryStorage() {
+ super(FileSystemHistoryStorage.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath =
+ new Path(conf.get(YarnConfiguration.FS_HISTORY_STORAGE_URI));
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT);
+ appAttemptHistoryDataRoot =
+ new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT);
+ containerHistoryDataRoot =
+ new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT);
+
+ try {
+ // init filesystem
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(appHistoryDataRoot);
+ fs.mkdirs(appAttemptHistoryDataRoot);
+ fs.mkdirs(containerHistoryDataRoot);
+ } catch (IOException e) {
+ LOG.error("Error init FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void writeApplication(ApplicationHistoryData app) throws Throwable {
+ ApplicationId appId = app.getApplicationId();
+ Path nodeCreatePath = new Path(appHistoryDataRoot, appId.toString());
+ LOG.info("Writing history data for Application: " + appId +
+ " at: " + nodeCreatePath);
+ assert app instanceof ApplicationHistoryDataPBImpl;
+ ApplicationHistoryDataPBImpl appPBImpl = (ApplicationHistoryDataPBImpl) app;
+ byte[] appHistoryData = appPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Application: " + appId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
+ throws Throwable {
+ ApplicationAttemptId appAttemptId = appAttempt.getApplicationAttemptId();
+ Path nodeCreatePath =
+ new Path(appAttemptHistoryDataRoot, appAttemptId.toString());
+ LOG.info("Writing history data for ApplicationAttempt: " + appAttemptId +
+ " at: " + nodeCreatePath);
+ assert appAttempt instanceof ApplicationAttemptHistoryDataPBImpl;
+ ApplicationAttemptHistoryDataPBImpl appAttemptPBImpl =
+ (ApplicationAttemptHistoryDataPBImpl) appAttempt;
+ byte[] appAttemptHistoryData = appAttemptPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for ApplicationAttempt: " +
+ appAttemptId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeContainer(ContainerHistoryData container) throws Throwable {
+ ContainerId containerId = container.getContainerId();
+ Path nodeCreatePath =
+ new Path(containerHistoryDataRoot, containerId.toString());
+ LOG.info("Writing history data for Container: " + containerId +
+ " at: " + nodeCreatePath);
+ assert container instanceof ContainerHistoryDataPBImpl;
+ ContainerHistoryDataPBImpl containerPBImpl =
+ (ContainerHistoryDataPBImpl) container;
+ byte[] containerHistoryData = containerPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, containerHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Container: " +
+ containerId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws Throwable {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId =
+ ConverterUtils.toApplicationId(childNodeName);
+ if (!applicatonId.equals(appId)) {
+ continue;
+ }
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ return new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Map getAllApplications() {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ Map applications =
+ new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId =
+ ConverterUtils.toApplicationId(childNodeName);
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationHistoryData appHistoryData =
+ new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ applications.put(applicationId, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return applications;
+ }
+
+ @Override
+ public Map
+ getApplicationAttempts(ApplicationId applicationId) throws Throwable {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ Map appAttempts =
+ new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId =
+ ConverterUtils.toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.getApplicationId().equals(applicationId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: " +
+ childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData =
+ new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ appAttempts.put(applicationAttemptId, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return appAttempts;
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws Throwable {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId =
+ ConverterUtils.toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.equals(appAttemptId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: " +
+ childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData =
+ new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ return appAttemptHistoryData;
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(
+ ContainerId containerId) throws Throwable {
+ FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(containerHistoryDataRoot, childNodeName);
+ ContainerId contId = ConverterUtils.toContainerId(childNodeName);
+ if (!containerId.equals(contId)) {
+ continue;
+ }
+ LOG.info("Loading container history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ContainerHistoryData containerHistoryData =
+ new ContainerHistoryDataPBImpl(
+ ContainerHistoryDataProto.parseFrom(childData));
+ return containerHistoryData;
+ } catch (IOException e) {
+ LOG.error("Error loading container history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(
+ ApplicationAttemptId appAttemptId) throws Throwable {
+ FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(containerHistoryDataRoot, childNodeName);
+ ContainerId containerId = ConverterUtils.toContainerId(childNodeName);
+ if (!containerId.getApplicationAttemptId().equals(appAttemptId)) {
+ continue;
+ }
+ LOG.info("Loading container history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ContainerHistoryData containerHistoryData =
+ new ContainerHistoryDataPBImpl(
+ ContainerHistoryDataProto.parseFrom(childData));
+ return containerHistoryData;
+ } catch (IOException e) {
+ LOG.error("Error loading container history data from: " +
+ childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ private byte[] readFile(Path inputPath, long len) throws IOException {
+ FSDataInputStream fsIn = fs.open(inputPath);
+ // state data will not be that "long"
+ byte[] data = new byte[(int)len];
+ fsIn.readFully(data);
+ fsIn.close();
+ return data;
+ }
+
+ private void writeFile(Path outputPath, byte[] data) throws IOException {
+ FSDataOutputStream fsOut = fs.create(outputPath, false);
+ fsOut.write(data);
+ fsOut.close();
+ }
+
+}