diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 44c35c3..73b6a6c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -261,6 +261,10 @@
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.rm-state-store.uri";
+ /** URI for FileSystemHistoryStorage */
+ public static final String FS_HISTORY_STORAGE_URI = YARN_PREFIX +
+ "ahs.fs-history-storage.uri";
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b6753bc..f460707 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -255,6 +255,15 @@
+ URI pointing to the location of the FileSystem path where
+ the history will be persisted. This must be supplied when using
+ org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemHistoryStorage
+ as the value for yarn.resourcemanager.ahs.writer.class
+ yarn.ahs.fs-history-storage.uri
+ ${hadoop.tmp.dir}/yarn/system/ahstore
+
+
+
The maximum number of completed applications RM keeps.
yarn.resourcemanager.max-completed-applications
10000
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index ed04555..07f7fd5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -26,9 +26,9 @@
4.0.0
org.apache.hadoop
- hadoop-yarn-server-applicationhistoryserver
+ hadoop-yarn-server-applicationhistoryservice
2.2.0-SNAPSHOT
- hadoop-yarn-server-applicationhistoryserver
+ hadoop-yarn-server-applicationhistoryservice
@@ -40,5 +40,11 @@
org.apache.hadoop
hadoop-yarn-server-common
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
index e8e90fc..7969e95 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice;
+import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
@@ -34,19 +36,22 @@
public interface ApplicationHistoryReader {
/**
- * This method returns Application {@link ApplicationHistoryData} for the specified
- * {@link ApplicationId}.
+ * This method returns Application {@link ApplicationHistoryData} for the
+ * specified {@link ApplicationId}.
*
* @return {@link ApplicationHistoryData} for the ApplicationId.
+ * @throws IOException
*/
- ApplicationHistoryData getApplication(ApplicationId appId);
+ ApplicationHistoryData getApplication(ApplicationId appId) throws IOException;
/**
* This method returns all Application {@link ApplicationHistoryData}s
*
* @return map {@link ApplicationId, @link ApplicationHistoryData}s.
+ * @throws IOException
*/
- Map getAllApplications();
+ Map getAllApplications()
+ throws Throwable;
/**
* Application can have multiple application attempts
@@ -54,9 +59,10 @@
* {@link ApplicationAttemptHistoryData}s for the Application.
*
* @return all {@link ApplicationAttemptHistoryData}s for the Application.
+ * @throws IOException
*/
Map getApplicationAttempts(
- ApplicationId appId);
+ ApplicationId appId) throws IOException;
/**
* This method returns {@link ApplicationAttemptHistoryData} for specified
@@ -66,15 +72,16 @@
* @return {@link ApplicationAttemptHistoryData} for ApplicationAttemptId
*/
ApplicationAttemptHistoryData getApplicationAttempt(
- ApplicationAttemptId appAttemptId);
+ ApplicationAttemptId appAttemptId) throws IOException;
/**
* This method returns {@link Container} for specified {@link ContainerId}.
*
* @param {@link ContainerId}
* @return {@link Container} for ContainerId
+ * @throws IOException
*/
- ContainerHistoryData getAMContainer(ContainerId containerId);
+ ContainerHistoryData getContainer(ContainerId containerId) throws IOException;
/**
* This method returns {@link ContainerHistoryData} for specified
@@ -82,6 +89,8 @@ ApplicationAttemptHistoryData getApplicationAttempt(
*
* @param {@link ApplicationAttemptId}
* @return {@link ContainerHistoryData} for ApplicationAttemptId
+ * @throws IOException
*/
- ContainerHistoryData getContainer(ApplicationAttemptId appAttemptId);
+ ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
index c721c63..ad17b48 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
@@ -35,24 +37,30 @@
/**
* This method persists an {@link ApplicationHistoryData} object.
- * @param app the {@link ApplicationHistoryData} object
- * @throws Throwable
+ *
+ * @param app
+ * the {@link ApplicationHistoryData} object
+ * @throws IOException
*/
- void writeApplication(ApplicationHistoryData app) throws Throwable;
+ void writeApplication(ApplicationHistoryData app) throws IOException;
/**
* This method persists an {@link ApplicationAttemptHistoryData} object.
- * @param appAttempt the {@link ApplicationAttemptHistoryData} object
- * @throws Throwable
+ *
+ * @param appAttempt
+ * the {@link ApplicationAttemptHistoryData} object
+ * @throws IOException
*/
- void writeApplicationAttempt(
- ApplicationAttemptHistoryData appAttempt) throws Throwable;
+ void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
+ throws IOException;
/**
* This method persists a {@link ContainerHistoryData} object.
- * @param container the {@link ContainerHistoryData} object
- * @throws Throwable
+ *
+ * @param container
+ * the {@link ContainerHistoryData} object
+ * @throws IOException
*/
- void writeContainer(ContainerHistoryData container) throws Throwable;
+ void writeContainer(ContainerHistoryData container) throws IOException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java
new file mode 100644
index 0000000..2bbbf72
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerHistoryDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * The class implements the methods of reading and writing the history data in
+ * any storage that implements a basic {@link FileSystem} interface.
+ */
+@Private
+@Unstable
+public class FileSystemHistoryStorage extends AbstractService implements
+ ApplicationHistoryReader, ApplicationHistoryWriter {
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemHistoryStorage.class);
+
+ protected static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ protected static final String APP_HISTORY_DATA_ROOT = "ApplicationRoot";
+ protected static final String APP_ATTEMPT_HISTORY_DATA_ROOT =
+ "ApplicationAttemptRoot";
+ protected static final String CONTAINER_HISTORY_DATA_ROOT = "ContainerRoot";
+
+ protected FileSystem fs;
+
+ protected Path appHistoryDataRoot;
+ protected Path appAttemptHistoryDataRoot;
+ protected Path containerHistoryDataRoot;
+
+ public FileSystemHistoryStorage() {
+ super(FileSystemHistoryStorage.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath = new Path(
+ conf.get(YarnConfiguration.FS_HISTORY_STORAGE_URI));
+ Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT);
+ appAttemptHistoryDataRoot =
+ new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT);
+ containerHistoryDataRoot =
+ new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT);
+
+ try {
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(appHistoryDataRoot);
+ fs.mkdirs(appAttemptHistoryDataRoot);
+ fs.mkdirs(containerHistoryDataRoot);
+ } catch (IOException e) {
+ LOG.error("Error init FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void writeApplication(ApplicationHistoryData app) throws IOException {
+ ApplicationId appId = app.getApplicationId();
+ Path nodeCreatePath = new Path(appHistoryDataRoot, appId.toString());
+ LOG.info("Writing history data for Application: " + appId + " at: "
+ + nodeCreatePath);
+ assert app instanceof ApplicationHistoryDataPBImpl;
+ ApplicationHistoryDataPBImpl appPBImpl = (ApplicationHistoryDataPBImpl) app;
+ byte[] appHistoryData = appPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Application: " + appId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
+ throws IOException {
+ ApplicationAttemptId appAttemptId = appAttempt.getApplicationAttemptId();
+ Path nodeCreatePath = new Path(appAttemptHistoryDataRoot,
+ appAttemptId.toString());
+ LOG.info("Writing history data for ApplicationAttempt: " + appAttemptId
+ + " at: " + nodeCreatePath);
+ assert appAttempt instanceof ApplicationAttemptHistoryDataPBImpl;
+ ApplicationAttemptHistoryDataPBImpl appAttemptPBImpl = (ApplicationAttemptHistoryDataPBImpl) appAttempt;
+ byte[] appAttemptHistoryData = appAttemptPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for ApplicationAttempt: "
+ + appAttemptId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeContainer(ContainerHistoryData container) throws IOException {
+ ContainerId containerId = container.getContainerId();
+ Path nodeCreatePath = new Path(containerHistoryDataRoot,
+ containerId.toString());
+ LOG.info("Writing history data for Container: " + containerId + " at: "
+ + nodeCreatePath);
+ assert container instanceof ContainerHistoryDataPBImpl;
+ ContainerHistoryDataPBImpl containerPBImpl = (ContainerHistoryDataPBImpl) container;
+ byte[] containerHistoryData = containerPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, containerHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Container: " + containerId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId = ConverterUtils
+ .toApplicationId(childNodeName);
+ if (!applicationId.equals(appId)) {
+ continue;
+ }
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ return new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Map getAllApplications()
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ Map applications = new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId = ConverterUtils
+ .toApplicationId(childNodeName);
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationHistoryData appHistoryData = new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ applications.put(applicationId, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return applications;
+ }
+
+ @Override
+ public Map getApplicationAttempts(
+ ApplicationId applicationId) throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ Map appAttempts = new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.getApplicationId().equals(applicationId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: "
+ + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData = new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ appAttempts.put(applicationAttemptId, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return appAttempts;
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.equals(appAttemptId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: "
+ + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData = new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ return appAttemptHistoryData;
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(containerHistoryDataRoot, childNodeName);
+ ContainerId contId = ConverterUtils.toContainerId(childNodeName);
+ if (!containerId.equals(contId)) {
+ continue;
+ }
+ LOG.info("Loading container history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ContainerHistoryData containerHistoryData = new ContainerHistoryDataPBImpl(
+ ContainerHistoryDataProto.parseFrom(childData));
+ return containerHistoryData;
+ } catch (IOException e) {
+ LOG.error(
+ "Error loading container history data from: " + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationAttemptHistoryData appAttempt = getApplicationAttempt(appAttemptId);
+ if (appAttempt == null) {
+ return null;
+ }
+ return this.getContainer(appAttempt.getMasterContainerId());
+ }
+
+ protected byte[] readFile(Path inputPath, long len) throws IOException {
+ FSDataInputStream fsIn = fs.open(inputPath);
+ // state data will not be that "long"
+ byte[] data = new byte[(int) len];
+ fsIn.readFully(data);
+ fsIn.close();
+ return data;
+ }
+
+ protected void writeFile(Path outputPath, byte[] data) throws IOException {
+ FSDataOutputStream fsOut = fs.create(outputPath, false);
+ fsOut.write(data);
+ fsOut.close();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemHistoryStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemHistoryStorage.java
new file mode 100644
index 0000000..ffe476c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemHistoryStorage.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFileSystemHistoryStorage {
+
+ private MiniDFSCluster cluster;
+ private TestFSHistoryStorage storage;
+
+ @Before
+ public void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(
+ new HdfsConfiguration()).numDataNodes(1).build();
+ storage = new TestFSHistoryStorage();
+ storage.init(new YarnConfiguration());
+ storage.start();
+ Assert.assertTrue(storage.fs.exists(storage.appHistoryDataRoot));
+ Assert.assertTrue(storage.fs.exists(storage.appAttemptHistoryDataRoot));
+ Assert.assertTrue(storage.fs.exists(storage.containerHistoryDataRoot));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ storage.stop();
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testReadWriteApplicationHistoryData() throws Exception {
+ ApplicationHistoryData appToWrite1 = mockApplicationHistoryData(1);
+ ApplicationHistoryData appToWrite2 = mockApplicationHistoryData(2);
+ storage.writeApplication(appToWrite1);
+ storage.writeApplication(appToWrite2);
+ ApplicationHistoryData appRead1 =
+ storage.getApplication(appToWrite1.getApplicationId());
+ Assert.assertNotNull(appRead1);
+ Assert.assertEquals(
+ appToWrite1.getApplicationId(), appRead1.getApplicationId());
+ ApplicationHistoryData appRead2 =
+ storage.getApplication(appToWrite2.getApplicationId());
+ Assert.assertNotNull(appRead2);
+ Assert.assertEquals(
+ appToWrite2.getApplicationId(), appRead2.getApplicationId());
+ Map applications =
+ storage.getAllApplications();
+ Assert.assertEquals(2, applications.size());
+ appRead1 = applications.get(appToWrite1.getApplicationId());
+ Assert.assertNotNull(appRead1);
+ Assert.assertEquals(
+ appToWrite1.getApplicationId(), appRead1.getApplicationId());
+ appRead2 = applications.get(appToWrite2.getApplicationId());
+ Assert.assertNotNull(appRead2);
+ Assert.assertEquals(
+ appToWrite2.getApplicationId(), appRead2.getApplicationId());
+ }
+
+ @Test
+ public void testReadWriteApplicationAttemptHistoryData() throws Exception {
+ ApplicationAttemptHistoryData appAttemptToWrite1 =
+ mockApplicationAttemptHistoryData(1, 1);
+ ApplicationAttemptHistoryData appAttemptToWrite2 =
+ mockApplicationAttemptHistoryData(1, 2);
+ storage.writeApplicationAttempt(appAttemptToWrite1);
+ storage.writeApplicationAttempt(appAttemptToWrite2);
+ ApplicationAttemptHistoryData appAttemptRead1 =
+ storage.getApplicationAttempt(
+ appAttemptToWrite1.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead1);
+ Assert.assertEquals(appAttemptToWrite1, appAttemptRead1);
+ ApplicationAttemptHistoryData appAttemptRead2 =
+ storage.getApplicationAttempt(
+ appAttemptToWrite2.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead2);
+ Assert.assertEquals(appAttemptToWrite2, appAttemptRead2);
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ Map appAttempts =
+ storage.getApplicationAttempts(appId);
+ Assert.assertEquals(2, appAttempts.size());
+ appAttemptRead1 =
+ appAttempts.get(appAttemptToWrite1.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead1);
+ Assert.assertEquals(appAttemptToWrite1, appAttemptRead1);
+ appAttemptRead2 =
+ appAttempts.get(appAttemptToWrite2.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead2);
+ Assert.assertEquals(appAttemptToWrite2, appAttemptRead2);
+ }
+
+ @Test
+ public void testReadWriteContainerHistoryData() throws Exception {
+ ApplicationAttemptHistoryData appAttemptToWrite1 =
+ mockApplicationAttemptHistoryData(1, 1);
+ ApplicationAttemptHistoryData appAttemptToWrite2 =
+ mockApplicationAttemptHistoryData(1, 2);
+ storage.writeApplicationAttempt(appAttemptToWrite1);
+ storage.writeApplicationAttempt(appAttemptToWrite2);
+ ContainerHistoryData containerToWrite1 =
+ mockContainerHistoryData(1, 1, 1);
+ ContainerHistoryData containerToWrite2 =
+ mockContainerHistoryData(1, 2, 1);
+ storage.writeContainer(containerToWrite1);
+ storage.writeContainer(containerToWrite2);
+ ContainerHistoryData containerRead1 =
+ storage.getContainer(containerToWrite1.getContainerId());
+ Assert.assertNotNull(containerRead1);
+ Assert.assertEquals(containerToWrite1, containerRead1);
+ ContainerHistoryData containerRead2 =
+ storage.getContainer(containerToWrite2.getContainerId());
+ Assert.assertNotNull(containerRead2);
+ Assert.assertEquals(containerToWrite2, containerRead2);
+ ApplicationAttemptId appAttemptId1 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
+ ApplicationAttemptId appAttemptId2 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 2);
+ containerRead1 = storage.getAMContainer(appAttemptId1);
+ Assert.assertNotNull(containerRead1);
+ Assert.assertEquals(containerToWrite1, containerRead1);
+ containerRead2 = storage.getAMContainer(appAttemptId2);
+ Assert.assertNotNull(containerRead2);
+ Assert.assertEquals(containerToWrite2, containerRead2);
+ }
+
+ private class TestFSHistoryStorage extends FileSystemHistoryStorage {
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath =
+ new Path(new Path(cluster.getURI()), new Path("/Test"));
+ Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT);
+ appAttemptHistoryDataRoot =
+ new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT);
+ containerHistoryDataRoot =
+ new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT);
+
+ fs = cluster.getFileSystem();
+ fs.mkdirs(appHistoryDataRoot);
+ fs.mkdirs(appAttemptHistoryDataRoot);
+ fs.mkdirs(containerHistoryDataRoot);
+ }
+ }
+
+ private static ApplicationHistoryData mockApplicationHistoryData(int appId) {
+ ApplicationHistoryData app = new ApplicationHistoryDataPBImpl();
+ app.setApplicationId(
+ ApplicationId.newInstance(0, appId));
+ return app;
+ }
+
+ private static ApplicationAttemptHistoryData
+ mockApplicationAttemptHistoryData(int appId, int appAttemptId) {
+ ApplicationAttemptHistoryData appAttempt =
+ new ApplicationAttemptHistoryDataPBImpl();
+ appAttempt.setApplicationAttemptId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId));
+ appAttempt.setMasterContainerId(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId), 1));
+ return appAttempt;
+ }
+
+ private static ContainerHistoryData mockContainerHistoryData(
+ int appId, int appAttemptId, int containerId) {
+ ContainerHistoryData container = new ContainerHistoryDataPBImpl();
+ container.setContainerId(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId),
+ containerId));
+ return container;
+ }
+}