diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index febf095..df53406 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -278,6 +278,10 @@
public static final String FS_RM_STATE_STORE_URI =
RM_PREFIX + "fs.state-store.uri";
+ /** URI for FileSystemHistoryStorage */
+ public static final String FS_HISTORY_STORAGE_URI =
+ YARN_PREFIX + "ahs.fs-history-storage.uri";
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index ab8d50a..02a3853 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -269,6 +269,15 @@
+ URI pointing to the location of the FileSystem path where
+ the history will be persisted. This must be supplied when using
+ org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
+ as the value for yarn.resourcemanager.ahs.writer.class
+ yarn.ahs.fs-history-storage.uri
+ ${hadoop.tmp.dir}/yarn/system/ahstore
+
+
+
The maximum number of completed applications RM keeps.
yarn.resourcemanager.max-completed-applications
10000
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 33350b4..406bb3b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -26,9 +26,9 @@
4.0.0
org.apache.hadoop
- hadoop-yarn-server-applicationhistoryserver
+ hadoop-yarn-server-applicationhistoryservice
2.3.0-SNAPSHOT
- hadoop-yarn-server-applicationhistoryserver
+ hadoop-yarn-server-applicationhistoryservice
@@ -40,5 +40,11 @@
org.apache.hadoop
hadoop-yarn-server-common
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
index c721c63..ad17b48 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
@@ -35,24 +37,30 @@
/**
* This method persists an {@link ApplicationHistoryData} object.
- * @param app the {@link ApplicationHistoryData} object
- * @throws Throwable
+ *
+ * @param app
+ * the {@link ApplicationHistoryData} object
+ * @throws IOException
*/
- void writeApplication(ApplicationHistoryData app) throws Throwable;
+ void writeApplication(ApplicationHistoryData app) throws IOException;
/**
* This method persists an {@link ApplicationAttemptHistoryData} object.
- * @param appAttempt the {@link ApplicationAttemptHistoryData} object
- * @throws Throwable
+ *
+ * @param appAttempt
+ * the {@link ApplicationAttemptHistoryData} object
+ * @throws IOException
*/
- void writeApplicationAttempt(
- ApplicationAttemptHistoryData appAttempt) throws Throwable;
+ void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
+ throws IOException;
/**
* This method persists a {@link ContainerHistoryData} object.
- * @param container the {@link ContainerHistoryData} object
- * @throws Throwable
+ *
+ * @param container
+ * the {@link ContainerHistoryData} object
+ * @throws IOException
*/
- void writeContainer(ContainerHistoryData container) throws Throwable;
+ void writeContainer(ContainerHistoryData container) throws IOException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..48710eb
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationHistoryDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerHistoryDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * The class implements the methods of reading and writing the history data in
+ * any storage that implements a basic {@link FileSystem} interface.
+ */
+@Private
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+ implements ApplicationHistoryStore {
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemApplicationHistoryStore.class);
+
+ protected static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ protected static final String APP_HISTORY_DATA_ROOT = "ApplicationRoot";
+ protected static final String APP_ATTEMPT_HISTORY_DATA_ROOT =
+ "ApplicationAttemptRoot";
+ protected static final String CONTAINER_HISTORY_DATA_ROOT = "ContainerRoot";
+
+ protected FileSystem fs;
+
+ protected Path appHistoryDataRoot;
+ protected Path appAttemptHistoryDataRoot;
+ protected Path containerHistoryDataRoot;
+
+ public FileSystemApplicationHistoryStore() {
+ super(FileSystemApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath = new Path(
+ conf.get(YarnConfiguration.FS_HISTORY_STORAGE_URI));
+ Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT);
+ appAttemptHistoryDataRoot =
+ new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT);
+ containerHistoryDataRoot =
+ new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT);
+
+ try {
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(appHistoryDataRoot);
+ fs.mkdirs(appAttemptHistoryDataRoot);
+ fs.mkdirs(containerHistoryDataRoot);
+ } catch (IOException e) {
+ LOG.error("Error init FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void writeApplication(ApplicationHistoryData app) throws IOException {
+ ApplicationId appId = app.getApplicationId();
+ Path nodeCreatePath = new Path(appHistoryDataRoot, appId.toString());
+ LOG.info("Writing history data for Application: " + appId + " at: "
+ + nodeCreatePath);
+ assert app instanceof ApplicationHistoryDataPBImpl;
+ ApplicationHistoryDataPBImpl appPBImpl = (ApplicationHistoryDataPBImpl) app;
+ byte[] appHistoryData = appPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Application: " + appId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
+ throws IOException {
+ ApplicationAttemptId appAttemptId = appAttempt.getApplicationAttemptId();
+ Path nodeCreatePath = new Path(appAttemptHistoryDataRoot,
+ appAttemptId.toString());
+ LOG.info("Writing history data for ApplicationAttempt: " + appAttemptId
+ + " at: " + nodeCreatePath);
+ assert appAttempt instanceof ApplicationAttemptHistoryDataPBImpl;
+ ApplicationAttemptHistoryDataPBImpl appAttemptPBImpl =
+ (ApplicationAttemptHistoryDataPBImpl) appAttempt;
+ byte[] appAttemptHistoryData = appAttemptPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for ApplicationAttempt: "
+ + appAttemptId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeContainer(
+ ContainerHistoryData container) throws IOException {
+ ContainerId containerId = container.getContainerId();
+ Path nodeCreatePath = new Path(containerHistoryDataRoot,
+ containerId.toString());
+ LOG.info("Writing history data for Container: " + containerId + " at: "
+ + nodeCreatePath);
+ assert container instanceof ContainerHistoryDataPBImpl;
+ ContainerHistoryDataPBImpl containerPBImpl =
+ (ContainerHistoryDataPBImpl) container;
+ byte[] containerHistoryData = containerPBImpl.getProto().toByteArray();
+ try {
+ writeFile(nodeCreatePath, containerHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error storing history data for Container: " + containerId, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId = ConverterUtils
+ .toApplicationId(childNodeName);
+ if (!applicationId.equals(appId)) {
+ continue;
+ }
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ return new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Map getAllApplications()
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot);
+ Map applications =
+ new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appHistoryDataRoot, childNodeName);
+ ApplicationId applicationId = ConverterUtils
+ .toApplicationId(childNodeName);
+ LOG.info("Loading application history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationHistoryData appHistoryData =
+ new ApplicationHistoryDataPBImpl(
+ ApplicationHistoryDataProto.parseFrom(childData));
+ applications.put(applicationId, appHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return applications;
+ }
+
+ @Override
+ public Map
+ getApplicationAttempts(ApplicationId applicationId) throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ Map appAttempts =
+ new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.getApplicationId().equals(applicationId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: "
+ + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData =
+ new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ appAttempts.put(applicationAttemptId, appAttemptHistoryData);
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return appAttempts;
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(childNodeName);
+ if (!applicationAttemptId.equals(appAttemptId)) {
+ continue;
+ }
+ LOG.info("Loading application attempt history data from node: "
+ + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ApplicationAttemptHistoryData appAttemptHistoryData =
+ new ApplicationAttemptHistoryDataPBImpl(
+ ApplicationAttemptHistoryDataProto.parseFrom(childData));
+ return appAttemptHistoryData;
+ } catch (IOException e) {
+ LOG.error("Error loading application attempt history data from: "
+ + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot);
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(containerHistoryDataRoot, childNodeName);
+ ContainerId contId = ConverterUtils.toContainerId(childNodeName);
+ if (!containerId.equals(contId)) {
+ continue;
+ }
+ LOG.info("Loading container history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ContainerHistoryData containerHistoryData =
+ new ContainerHistoryDataPBImpl(
+ ContainerHistoryDataProto.parseFrom(childData));
+ return containerHistoryData;
+ } catch (IOException e) {
+ LOG.error(
+ "Error loading container history data from: " + childNodeName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationAttemptHistoryData appAttempt =
+ getApplicationAttempt(appAttemptId);
+ if (appAttempt == null) {
+ return null;
+ }
+ return this.getContainer(appAttempt.getMasterContainerId());
+ }
+
+ @Override
+ public Map getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot);
+ Map containers =
+ new HashMap();
+ for (FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = new Path(containerHistoryDataRoot, childNodeName);
+ ContainerId containerId = ConverterUtils.toContainerId(childNodeName);
+ if (!containerId.getApplicationAttemptId().equals(appAttemptId)) {
+ continue;
+ }
+ LOG.info("Loading container history data from node: " + childNodeName);
+ try {
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ContainerHistoryData containerHistoryData =
+ new ContainerHistoryDataPBImpl(
+ ContainerHistoryDataProto.parseFrom(childData));
+ containers.put(containerId, containerHistoryData);
+ } catch (IOException e) {
+ LOG.error(
+ "Error loading container history data from: " + childNodeName, e);
+ throw e;
+ }
+ }
+ return containers;
+ }
+
+ protected byte[] readFile(Path inputPath, long len) throws IOException {
+ FSDataInputStream fsIn = fs.open(inputPath);
+ // state data will not be that "long"
+ byte[] data = new byte[(int) len];
+ fsIn.readFully(data);
+ fsIn.close();
+ return data;
+ }
+
+ protected void writeFile(Path outputPath, byte[] data) throws IOException {
+ FSDataOutputStream fsOut = fs.create(outputPath, false);
+ fsOut.write(data);
+ fsOut.close();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
index 0070c35..0136e89 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
@@ -112,7 +112,7 @@ public ContainerHistoryData getContainer(ContainerId containerId) {
}
@Override
- public void writeApplication(ApplicationHistoryData app) throws Throwable {
+ public void writeApplication(ApplicationHistoryData app) throws IOException {
if (app != null) {
ApplicationHistoryData oldData =
applicationData.putIfAbsent(app.getApplicationId(), app);
@@ -125,7 +125,7 @@ public void writeApplication(ApplicationHistoryData app) throws Throwable {
@Override
public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
- throws Throwable {
+ throws IOException {
if (appAttempt != null) {
if (applicationAttemptData.containsKey(appAttempt
.getApplicationAttemptId().getApplicationId())) {
@@ -151,7 +151,7 @@ public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
}
@Override
- public void writeContainer(ContainerHistoryData container) throws Throwable {
+ public void writeContainer(ContainerHistoryData container) throws IOException {
if (container != null) {
ContainerHistoryData oldContainer =
containerData.putIfAbsent(container.getContainerId(), container);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..13f551d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFileSystemApplicationHistoryStore {
+
+ private Path fsWorkingPath;
+ private MiniDFSCluster cluster;
+ private TestFSApplicationHistoryStore store;
+
+ @Before
+ public void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(
+ new HdfsConfiguration()).numDataNodes(1).build();
+ store = new TestFSApplicationHistoryStore();
+ store.init(new YarnConfiguration());
+ store.start();
+ Assert.assertTrue(store.fs.exists(store.appHistoryDataRoot));
+ Assert.assertTrue(store.fs.exists(store.appAttemptHistoryDataRoot));
+ Assert.assertTrue(store.fs.exists(store.containerHistoryDataRoot));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop();
+ cluster.getFileSystem().delete(fsWorkingPath, true);
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testReadWriteApplicationHistoryData() throws Exception {
+ ApplicationHistoryData appToWrite1 = mockApplicationHistoryData(1);
+ ApplicationHistoryData appToWrite2 = mockApplicationHistoryData(2);
+ store.writeApplication(appToWrite1);
+ store.writeApplication(appToWrite2);
+ // get application by appId
+ ApplicationHistoryData appRead1 =
+ store.getApplication(appToWrite1.getApplicationId());
+ Assert.assertNotNull(appRead1);
+ Assert.assertEquals(
+ appToWrite1.getApplicationId(), appRead1.getApplicationId());
+ ApplicationHistoryData appRead2 =
+ store.getApplication(appToWrite2.getApplicationId());
+ Assert.assertNotNull(appRead2);
+ Assert.assertEquals(
+ appToWrite2.getApplicationId(), appRead2.getApplicationId());
+ // get all applications
+ Map applications =
+ store.getAllApplications();
+ Assert.assertEquals(2, applications.size());
+ appRead1 = applications.get(appToWrite1.getApplicationId());
+ Assert.assertNotNull(appRead1);
+ Assert.assertEquals(
+ appToWrite1.getApplicationId(), appRead1.getApplicationId());
+ appRead2 = applications.get(appToWrite2.getApplicationId());
+ Assert.assertNotNull(appRead2);
+ Assert.assertEquals(
+ appToWrite2.getApplicationId(), appRead2.getApplicationId());
+ }
+
+ @Test
+ public void testReadWriteApplicationAttemptHistoryData() throws Exception {
+ ApplicationAttemptHistoryData appAttemptToWrite1 =
+ mockApplicationAttemptHistoryData(1, 1);
+ ApplicationAttemptHistoryData appAttemptToWrite2 =
+ mockApplicationAttemptHistoryData(1, 2);
+ store.writeApplicationAttempt(appAttemptToWrite1);
+ store.writeApplicationAttempt(appAttemptToWrite2);
+ // get attempt by attemptId
+ ApplicationAttemptHistoryData appAttemptRead1 =
+ store.getApplicationAttempt(
+ appAttemptToWrite1.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead1);
+ Assert.assertEquals(appAttemptToWrite1, appAttemptRead1);
+ ApplicationAttemptHistoryData appAttemptRead2 =
+ store.getApplicationAttempt(
+ appAttemptToWrite2.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead2);
+ Assert.assertEquals(appAttemptToWrite2, appAttemptRead2);
+ // get attempts by appId
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ Map appAttempts =
+ store.getApplicationAttempts(appId);
+ Assert.assertEquals(2, appAttempts.size());
+ appAttemptRead1 =
+ appAttempts.get(appAttemptToWrite1.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead1);
+ Assert.assertEquals(appAttemptToWrite1, appAttemptRead1);
+ appAttemptRead2 =
+ appAttempts.get(appAttemptToWrite2.getApplicationAttemptId());
+ Assert.assertNotNull(appAttemptRead2);
+ Assert.assertEquals(appAttemptToWrite2, appAttemptRead2);
+ }
+
+ @Test
+ public void testReadWriteContainerHistoryData() throws Exception {
+ ApplicationAttemptHistoryData appAttemptToWrite1 =
+ mockApplicationAttemptHistoryData(1, 1);
+ ApplicationAttemptHistoryData appAttemptToWrite2 =
+ mockApplicationAttemptHistoryData(1, 2);
+ store.writeApplicationAttempt(appAttemptToWrite1);
+ store.writeApplicationAttempt(appAttemptToWrite2);
+ ContainerHistoryData containerToWrite1 =
+ mockContainerHistoryData(1, 1, 1);
+ ContainerHistoryData containerToWrite2 =
+ mockContainerHistoryData(1, 2, 1);
+ ContainerHistoryData containerToWrite3 =
+ mockContainerHistoryData(1, 2, 2);
+ store.writeContainer(containerToWrite1);
+ store.writeContainer(containerToWrite2);
+ store.writeContainer(containerToWrite3);
+ // get container by containerId
+ ContainerHistoryData containerRead1 =
+ store.getContainer(containerToWrite1.getContainerId());
+ Assert.assertNotNull(containerRead1);
+ Assert.assertEquals(containerToWrite1, containerRead1);
+ ContainerHistoryData containerRead2 =
+ store.getContainer(containerToWrite2.getContainerId());
+ Assert.assertNotNull(containerRead2);
+ Assert.assertEquals(containerToWrite2, containerRead2);
+ ContainerHistoryData containerRead3 =
+ store.getContainer(containerToWrite3.getContainerId());
+ Assert.assertNotNull(containerRead3);
+ Assert.assertEquals(containerToWrite3, containerRead3);
+ // get AM container by attemptId
+ ApplicationAttemptId appAttemptId1 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
+ ApplicationAttemptId appAttemptId2 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 2);
+ containerRead1 = store.getAMContainer(appAttemptId1);
+ Assert.assertNotNull(containerRead1);
+ Assert.assertEquals(containerToWrite1, containerRead1);
+ containerRead2 = store.getAMContainer(appAttemptId2);
+ Assert.assertNotNull(containerRead2);
+ Assert.assertEquals(containerToWrite2, containerRead2);
+ // get containers by attemptId
+ Map containers =
+ store.getContainers(appAttemptId2);
+ Assert.assertEquals(2, containers.size());
+ containerRead2 = containers.get(containerToWrite2.getContainerId());
+ Assert.assertNotNull(containerRead2);
+ Assert.assertEquals(containerToWrite2, containerRead2);
+ containerRead3 = containers.get(containerToWrite3.getContainerId());
+ Assert.assertNotNull(containerRead3);
+ Assert.assertEquals(containerToWrite3, containerRead3);
+ }
+
+ private class TestFSApplicationHistoryStore
+ extends FileSystemApplicationHistoryStore {
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ fsWorkingPath =
+ new Path(new Path(cluster.getURI()), new Path("/Test"));
+ Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT);
+ appAttemptHistoryDataRoot =
+ new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT);
+ containerHistoryDataRoot =
+ new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT);
+
+ fs = cluster.getFileSystem();
+ fs.mkdirs(appHistoryDataRoot);
+ fs.mkdirs(appAttemptHistoryDataRoot);
+ fs.mkdirs(containerHistoryDataRoot);
+ }
+ }
+
+ private static ApplicationHistoryData mockApplicationHistoryData(int appId) {
+ ApplicationHistoryData app = new ApplicationHistoryDataPBImpl();
+ app.setApplicationId(
+ ApplicationId.newInstance(0, appId));
+ return app;
+ }
+
+ private static ApplicationAttemptHistoryData
+ mockApplicationAttemptHistoryData(int appId, int appAttemptId) {
+ ApplicationAttemptHistoryData appAttempt =
+ new ApplicationAttemptHistoryDataPBImpl();
+ appAttempt.setApplicationAttemptId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId));
+ appAttempt.setMasterContainerId(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId), 1));
+ return appAttempt;
+ }
+
+ private static ContainerHistoryData mockContainerHistoryData(
+ int appId, int appAttemptId, int containerId) {
+ ContainerHistoryData container = new ContainerHistoryDataPBImpl();
+ container.setContainerId(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, appId), appAttemptId),
+ containerId));
+ return container;
+ }
+}