diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index febf095..df53406 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -278,6 +278,10 @@ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; + /** URI for FileSystemHistoryStorage */ + public static final String FS_HISTORY_STORAGE_URI = + YARN_PREFIX + "ahs.fs-history-storage.uri"; + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ab8d50a..02a3853 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -269,6 +269,15 @@ + URI pointing to the location of the FileSystem path where + the history will be persisted. This must be supplied when using + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + as the value for yarn.resourcemanager.ahs.writer.class + yarn.ahs.fs-history-storage.uri + ${hadoop.tmp.dir}/yarn/system/ahstore + + + The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications 10000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index 33350b4..406bb3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -26,9 +26,9 @@ 4.0.0 org.apache.hadoop - hadoop-yarn-server-applicationhistoryserver + hadoop-yarn-server-applicationhistoryservice 2.3.0-SNAPSHOT - hadoop-yarn-server-applicationhistoryserver + hadoop-yarn-server-applicationhistoryservice @@ -40,5 +40,11 @@ org.apache.hadoop hadoop-yarn-server-common + + org.apache.hadoop + hadoop-hdfs + test-jar + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java index c721c63..ad17b48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; @@ -35,24 +37,30 @@ /** * This method persists an {@link ApplicationHistoryData} object. - * @param app the {@link ApplicationHistoryData} object - * @throws Throwable + * + * @param app + * the {@link ApplicationHistoryData} object + * @throws IOException */ - void writeApplication(ApplicationHistoryData app) throws Throwable; + void writeApplication(ApplicationHistoryData app) throws IOException; /** * This method persists an {@link ApplicationAttemptHistoryData} object. - * @param appAttempt the {@link ApplicationAttemptHistoryData} object - * @throws Throwable + * + * @param appAttempt + * the {@link ApplicationAttemptHistoryData} object + * @throws IOException */ - void writeApplicationAttempt( - ApplicationAttemptHistoryData appAttempt) throws Throwable; + void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) + throws IOException; /** * This method persists a {@link ContainerHistoryData} object. - * @param container the {@link ContainerHistoryData} object - * @throws Throwable + * + * @param container + * the {@link ContainerHistoryData} object + * @throws IOException */ - void writeContainer(ContainerHistoryData container) throws Throwable; + void writeContainer(ContainerHistoryData container) throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..48710eb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptHistoryDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationHistoryDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerHistoryDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * The class implements the methods of reading and writing the history data in + * any storage that implements a basic {@link FileSystem} interface. + */ +@Private +@Unstable +public class FileSystemApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { + + private static final Log LOG = LogFactory + .getLog(FileSystemApplicationHistoryStore.class); + + protected static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + protected static final String APP_HISTORY_DATA_ROOT = "ApplicationRoot"; + protected static final String APP_ATTEMPT_HISTORY_DATA_ROOT = + "ApplicationAttemptRoot"; + protected static final String CONTAINER_HISTORY_DATA_ROOT = "ContainerRoot"; + + protected FileSystem fs; + + protected Path appHistoryDataRoot; + protected Path appAttemptHistoryDataRoot; + protected Path containerHistoryDataRoot; + + public FileSystemApplicationHistoryStore() { + super(FileSystemApplicationHistoryStore.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = new Path( + conf.get(YarnConfiguration.FS_HISTORY_STORAGE_URI)); + Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT); + appAttemptHistoryDataRoot = + new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT); + containerHistoryDataRoot = + new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT); + + try { + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(appHistoryDataRoot); + fs.mkdirs(appAttemptHistoryDataRoot); + fs.mkdirs(containerHistoryDataRoot); + } catch (IOException e) { + LOG.error("Error init FileSystemHistoryStorage", e); + throw e; + } + } + + @Override + public void serviceStop() throws Exception { + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + LOG.error("Error closing FileSystemHistoryStorage", e); + throw e; + } + } + } + + @Override + public void writeApplication(ApplicationHistoryData app) throws IOException { + ApplicationId appId = app.getApplicationId(); + Path nodeCreatePath = new Path(appHistoryDataRoot, appId.toString()); + LOG.info("Writing history data for Application: " + appId + " at: " + + nodeCreatePath); + assert app instanceof ApplicationHistoryDataPBImpl; + ApplicationHistoryDataPBImpl appPBImpl = (ApplicationHistoryDataPBImpl) app; + byte[] appHistoryData = appPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, appHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for Application: " + appId, e); + throw e; + } + } + + @Override + public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) + throws IOException { + ApplicationAttemptId appAttemptId = appAttempt.getApplicationAttemptId(); + Path nodeCreatePath = new Path(appAttemptHistoryDataRoot, + appAttemptId.toString()); + LOG.info("Writing history data for ApplicationAttempt: " + appAttemptId + + " at: " + nodeCreatePath); + assert appAttempt instanceof ApplicationAttemptHistoryDataPBImpl; + ApplicationAttemptHistoryDataPBImpl appAttemptPBImpl = + (ApplicationAttemptHistoryDataPBImpl) appAttempt; + byte[] appAttemptHistoryData = appAttemptPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, appAttemptHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for ApplicationAttempt: " + + appAttemptId, e); + throw e; + } + } + + @Override + public void writeContainer( + ContainerHistoryData container) throws IOException { + ContainerId containerId = container.getContainerId(); + Path nodeCreatePath = new Path(containerHistoryDataRoot, + containerId.toString()); + LOG.info("Writing history data for Container: " + containerId + " at: " + + nodeCreatePath); + assert container instanceof ContainerHistoryDataPBImpl; + ContainerHistoryDataPBImpl containerPBImpl = + (ContainerHistoryDataPBImpl) container; + byte[] containerHistoryData = containerPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, containerHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for Container: " + containerId, e); + throw e; + } + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appHistoryDataRoot, childNodeName); + ApplicationId applicationId = ConverterUtils + .toApplicationId(childNodeName); + if (!applicationId.equals(appId)) { + continue; + } + LOG.info("Loading application history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + return new ApplicationHistoryDataPBImpl( + ApplicationHistoryDataProto.parseFrom(childData)); + } catch (IOException e) { + LOG.error("Error loading application history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public Map getAllApplications() + throws IOException { + FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot); + Map applications = + new HashMap(); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appHistoryDataRoot, childNodeName); + ApplicationId applicationId = ConverterUtils + .toApplicationId(childNodeName); + LOG.info("Loading application history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationHistoryData appHistoryData = + new ApplicationHistoryDataPBImpl( + ApplicationHistoryDataProto.parseFrom(childData)); + applications.put(applicationId, appHistoryData); + } catch (IOException e) { + LOG.error("Error loading application history data from: " + + childNodeName, e); + throw e; + } + } + return applications; + } + + @Override + public Map + getApplicationAttempts(ApplicationId applicationId) throws IOException { + FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot); + Map appAttempts = + new HashMap(); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName); + ApplicationAttemptId applicationAttemptId = ConverterUtils + .toApplicationAttemptId(childNodeName); + if (!applicationAttemptId.getApplicationId().equals(applicationId)) { + continue; + } + LOG.info("Loading application attempt history data from node: " + + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationAttemptHistoryData appAttemptHistoryData = + new ApplicationAttemptHistoryDataPBImpl( + ApplicationAttemptHistoryDataProto.parseFrom(childData)); + appAttempts.put(applicationAttemptId, appAttemptHistoryData); + } catch (IOException e) { + LOG.error("Error loading application attempt history data from: " + + childNodeName, e); + throw e; + } + } + return appAttempts; + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName); + ApplicationAttemptId applicationAttemptId = ConverterUtils + .toApplicationAttemptId(childNodeName); + if (!applicationAttemptId.equals(appAttemptId)) { + continue; + } + LOG.info("Loading application attempt history data from node: " + + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationAttemptHistoryData appAttemptHistoryData = + new ApplicationAttemptHistoryDataPBImpl( + ApplicationAttemptHistoryDataProto.parseFrom(childData)); + return appAttemptHistoryData; + } catch (IOException e) { + LOG.error("Error loading application attempt history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(containerHistoryDataRoot, childNodeName); + ContainerId contId = ConverterUtils.toContainerId(childNodeName); + if (!containerId.equals(contId)) { + continue; + } + LOG.info("Loading container history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ContainerHistoryData containerHistoryData = + new ContainerHistoryDataPBImpl( + ContainerHistoryDataProto.parseFrom(childData)); + return containerHistoryData; + } catch (IOException e) { + LOG.error( + "Error loading container history data from: " + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + ApplicationAttemptHistoryData appAttempt = + getApplicationAttempt(appAttemptId); + if (appAttempt == null) { + return null; + } + return this.getContainer(appAttempt.getMasterContainerId()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot); + Map containers = + new HashMap(); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(containerHistoryDataRoot, childNodeName); + ContainerId containerId = ConverterUtils.toContainerId(childNodeName); + if (!containerId.getApplicationAttemptId().equals(appAttemptId)) { + continue; + } + LOG.info("Loading container history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ContainerHistoryData containerHistoryData = + new ContainerHistoryDataPBImpl( + ContainerHistoryDataProto.parseFrom(childData)); + containers.put(containerId, containerHistoryData); + } catch (IOException e) { + LOG.error( + "Error loading container history data from: " + childNodeName, e); + throw e; + } + } + return containers; + } + + protected byte[] readFile(Path inputPath, long len) throws IOException { + FSDataInputStream fsIn = fs.open(inputPath); + // state data will not be that "long" + byte[] data = new byte[(int) len]; + fsIn.readFully(data); + fsIn.close(); + return data; + } + + protected void writeFile(Path outputPath, byte[] data) throws IOException { + FSDataOutputStream fsOut = fs.create(outputPath, false); + fsOut.write(data); + fsOut.close(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java index 0070c35..0136e89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java @@ -112,7 +112,7 @@ public ContainerHistoryData getContainer(ContainerId containerId) { } @Override - public void writeApplication(ApplicationHistoryData app) throws Throwable { + public void writeApplication(ApplicationHistoryData app) throws IOException { if (app != null) { ApplicationHistoryData oldData = applicationData.putIfAbsent(app.getApplicationId(), app); @@ -125,7 +125,7 @@ public void writeApplication(ApplicationHistoryData app) throws Throwable { @Override public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) - throws Throwable { + throws IOException { if (appAttempt != null) { if (applicationAttemptData.containsKey(appAttempt .getApplicationAttemptId().getApplicationId())) { @@ -151,7 +151,7 @@ public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) } @Override - public void writeContainer(ContainerHistoryData container) throws Throwable { + public void writeContainer(ContainerHistoryData container) throws IOException { if (container != null) { ContainerHistoryData oldContainer = containerData.putIfAbsent(container.getContainerId(), container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..13f551d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFileSystemApplicationHistoryStore { + + private Path fsWorkingPath; + private MiniDFSCluster cluster; + private TestFSApplicationHistoryStore store; + + @Before + public void setup() throws Exception { + cluster = new MiniDFSCluster.Builder( + new HdfsConfiguration()).numDataNodes(1).build(); + store = new TestFSApplicationHistoryStore(); + store.init(new YarnConfiguration()); + store.start(); + Assert.assertTrue(store.fs.exists(store.appHistoryDataRoot)); + Assert.assertTrue(store.fs.exists(store.appAttemptHistoryDataRoot)); + Assert.assertTrue(store.fs.exists(store.containerHistoryDataRoot)); + } + + @After + public void tearDown() throws Exception { + store.stop(); + cluster.getFileSystem().delete(fsWorkingPath, true); + cluster.shutdown(); + } + + @Test + public void testReadWriteApplicationHistoryData() throws Exception { + ApplicationHistoryData appToWrite1 = mockApplicationHistoryData(1); + ApplicationHistoryData appToWrite2 = mockApplicationHistoryData(2); + store.writeApplication(appToWrite1); + store.writeApplication(appToWrite2); + // get application by appId + ApplicationHistoryData appRead1 = + store.getApplication(appToWrite1.getApplicationId()); + Assert.assertNotNull(appRead1); + Assert.assertEquals( + appToWrite1.getApplicationId(), appRead1.getApplicationId()); + ApplicationHistoryData appRead2 = + store.getApplication(appToWrite2.getApplicationId()); + Assert.assertNotNull(appRead2); + Assert.assertEquals( + appToWrite2.getApplicationId(), appRead2.getApplicationId()); + // get all applications + Map applications = + store.getAllApplications(); + Assert.assertEquals(2, applications.size()); + appRead1 = applications.get(appToWrite1.getApplicationId()); + Assert.assertNotNull(appRead1); + Assert.assertEquals( + appToWrite1.getApplicationId(), appRead1.getApplicationId()); + appRead2 = applications.get(appToWrite2.getApplicationId()); + Assert.assertNotNull(appRead2); + Assert.assertEquals( + appToWrite2.getApplicationId(), appRead2.getApplicationId()); + } + + @Test + public void testReadWriteApplicationAttemptHistoryData() throws Exception { + ApplicationAttemptHistoryData appAttemptToWrite1 = + mockApplicationAttemptHistoryData(1, 1); + ApplicationAttemptHistoryData appAttemptToWrite2 = + mockApplicationAttemptHistoryData(1, 2); + store.writeApplicationAttempt(appAttemptToWrite1); + store.writeApplicationAttempt(appAttemptToWrite2); + // get attempt by attemptId + ApplicationAttemptHistoryData appAttemptRead1 = + store.getApplicationAttempt( + appAttemptToWrite1.getApplicationAttemptId()); + Assert.assertNotNull(appAttemptRead1); + Assert.assertEquals(appAttemptToWrite1, appAttemptRead1); + ApplicationAttemptHistoryData appAttemptRead2 = + store.getApplicationAttempt( + appAttemptToWrite2.getApplicationAttemptId()); + Assert.assertNotNull(appAttemptRead2); + Assert.assertEquals(appAttemptToWrite2, appAttemptRead2); + // get attempts by appId + ApplicationId appId = ApplicationId.newInstance(0, 1); + Map appAttempts = + store.getApplicationAttempts(appId); + Assert.assertEquals(2, appAttempts.size()); + appAttemptRead1 = + appAttempts.get(appAttemptToWrite1.getApplicationAttemptId()); + Assert.assertNotNull(appAttemptRead1); + Assert.assertEquals(appAttemptToWrite1, appAttemptRead1); + appAttemptRead2 = + appAttempts.get(appAttemptToWrite2.getApplicationAttemptId()); + Assert.assertNotNull(appAttemptRead2); + Assert.assertEquals(appAttemptToWrite2, appAttemptRead2); + } + + @Test + public void testReadWriteContainerHistoryData() throws Exception { + ApplicationAttemptHistoryData appAttemptToWrite1 = + mockApplicationAttemptHistoryData(1, 1); + ApplicationAttemptHistoryData appAttemptToWrite2 = + mockApplicationAttemptHistoryData(1, 2); + store.writeApplicationAttempt(appAttemptToWrite1); + store.writeApplicationAttempt(appAttemptToWrite2); + ContainerHistoryData containerToWrite1 = + mockContainerHistoryData(1, 1, 1); + ContainerHistoryData containerToWrite2 = + mockContainerHistoryData(1, 2, 1); + ContainerHistoryData containerToWrite3 = + mockContainerHistoryData(1, 2, 2); + store.writeContainer(containerToWrite1); + store.writeContainer(containerToWrite2); + store.writeContainer(containerToWrite3); + // get container by containerId + ContainerHistoryData containerRead1 = + store.getContainer(containerToWrite1.getContainerId()); + Assert.assertNotNull(containerRead1); + Assert.assertEquals(containerToWrite1, containerRead1); + ContainerHistoryData containerRead2 = + store.getContainer(containerToWrite2.getContainerId()); + Assert.assertNotNull(containerRead2); + Assert.assertEquals(containerToWrite2, containerRead2); + ContainerHistoryData containerRead3 = + store.getContainer(containerToWrite3.getContainerId()); + Assert.assertNotNull(containerRead3); + Assert.assertEquals(containerToWrite3, containerRead3); + // get AM container by attemptId + ApplicationAttemptId appAttemptId1 = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + ApplicationAttemptId appAttemptId2 = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 2); + containerRead1 = store.getAMContainer(appAttemptId1); + Assert.assertNotNull(containerRead1); + Assert.assertEquals(containerToWrite1, containerRead1); + containerRead2 = store.getAMContainer(appAttemptId2); + Assert.assertNotNull(containerRead2); + Assert.assertEquals(containerToWrite2, containerRead2); + // get containers by attemptId + Map containers = + store.getContainers(appAttemptId2); + Assert.assertEquals(2, containers.size()); + containerRead2 = containers.get(containerToWrite2.getContainerId()); + Assert.assertNotNull(containerRead2); + Assert.assertEquals(containerToWrite2, containerRead2); + containerRead3 = containers.get(containerToWrite3.getContainerId()); + Assert.assertNotNull(containerRead3); + Assert.assertEquals(containerToWrite3, containerRead3); + } + + private class TestFSApplicationHistoryStore + extends FileSystemApplicationHistoryStore { + @Override + public void serviceInit(Configuration conf) throws Exception { + fsWorkingPath = + new Path(new Path(cluster.getURI()), new Path("/Test")); + Path rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT); + appAttemptHistoryDataRoot = + new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT); + containerHistoryDataRoot = + new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT); + + fs = cluster.getFileSystem(); + fs.mkdirs(appHistoryDataRoot); + fs.mkdirs(appAttemptHistoryDataRoot); + fs.mkdirs(containerHistoryDataRoot); + } + } + + private static ApplicationHistoryData mockApplicationHistoryData(int appId) { + ApplicationHistoryData app = new ApplicationHistoryDataPBImpl(); + app.setApplicationId( + ApplicationId.newInstance(0, appId)); + return app; + } + + private static ApplicationAttemptHistoryData + mockApplicationAttemptHistoryData(int appId, int appAttemptId) { + ApplicationAttemptHistoryData appAttempt = + new ApplicationAttemptHistoryDataPBImpl(); + appAttempt.setApplicationAttemptId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, appId), appAttemptId)); + appAttempt.setMasterContainerId(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, appId), appAttemptId), 1)); + return appAttempt; + } + + private static ContainerHistoryData mockContainerHistoryData( + int appId, int appAttemptId, int containerId) { + ContainerHistoryData container = new ContainerHistoryDataPBImpl(); + container.setContainerId(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, appId), appAttemptId), + containerId)); + return container; + } +}