diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 44c35c3..73b6a6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -261,6 +261,10 @@ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.rm-state-store.uri"; + /** URI for FileSystemHistoryStorage */ + public static final String FS_HISTORY_STORAGE_URI = YARN_PREFIX + + "ahs.fs-history-storage.uri"; + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b6753bc..f460707 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -255,6 +255,15 @@ + URI pointing to the location of the FileSystem path where + the history will be persisted. This must be supplied when using + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemHistoryStorage + as the value for yarn.resourcemanager.ahs.writer.class + yarn.ahs.fs-history-storage.uri + ${hadoop.tmp.dir}/yarn/system/ahstore + + + The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications 10000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java new file mode 100644 index 0000000..6f62a39 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemHistoryStorage.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptHistoryDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationHistoryDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerHistoryDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + + +public class FileSystemHistoryStorage extends AbstractService + implements ApplicationHistoryReader, ApplicationHistoryWriter { + + private static final Log LOG = LogFactory.getLog(FileSystemHistoryStorage.class); + + private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + private static final String APP_HISTORY_DATA_ROOT = "ApplicationRoot"; + private static final String APP_ATTEMPT_HISTORY_DATA_ROOT = "ApplicationAttemptRoot"; + private static final String CONTAINER_HISTORY_DATA_ROOT = "ContainerRoot"; + + private FileSystem fs; + + private Path rootDirPath; + private Path appHistoryDataRoot; + private Path appAttemptHistoryDataRoot; + private Path containerHistoryDataRoot; + + public FileSystemHistoryStorage() { + super(FileSystemHistoryStorage.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = + new Path(conf.get(YarnConfiguration.FS_HISTORY_STORAGE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + appHistoryDataRoot = new Path(rootDirPath, APP_HISTORY_DATA_ROOT); + appAttemptHistoryDataRoot = + new Path(rootDirPath, APP_ATTEMPT_HISTORY_DATA_ROOT); + containerHistoryDataRoot = + new Path(rootDirPath, CONTAINER_HISTORY_DATA_ROOT); + + try { + // init filesystem + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(appHistoryDataRoot); + fs.mkdirs(appAttemptHistoryDataRoot); + fs.mkdirs(containerHistoryDataRoot); + } catch (IOException e) { + LOG.error("Error init FileSystemHistoryStorage", e); + throw e; + } + } + + @Override + public void serviceStop() throws Exception { + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + LOG.error("Error closing FileSystemHistoryStorage", e); + throw e; + } + } + } + + @Override + public void writeApplication(ApplicationHistoryData app) throws Throwable { + ApplicationId appId = app.getApplicationId(); + Path nodeCreatePath = new Path(appHistoryDataRoot, appId.toString()); + LOG.info("Writing history data for Application: " + appId + + " at: " + nodeCreatePath); + assert app instanceof ApplicationHistoryDataPBImpl; + ApplicationHistoryDataPBImpl appPBImpl = (ApplicationHistoryDataPBImpl) app; + byte[] appHistoryData = appPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, appHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for Application: " + appId, e); + throw e; + } + } + + @Override + public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) + throws Throwable { + ApplicationAttemptId appAttemptId = appAttempt.getApplicationAttemptId(); + Path nodeCreatePath = + new Path(appAttemptHistoryDataRoot, appAttemptId.toString()); + LOG.info("Writing history data for ApplicationAttempt: " + appAttemptId + + " at: " + nodeCreatePath); + assert appAttempt instanceof ApplicationAttemptHistoryDataPBImpl; + ApplicationAttemptHistoryDataPBImpl appAttemptPBImpl = + (ApplicationAttemptHistoryDataPBImpl) appAttempt; + byte[] appAttemptHistoryData = appAttemptPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, appAttemptHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for ApplicationAttempt: " + + appAttemptId, e); + throw e; + } + } + + @Override + public void writeContainer(ContainerHistoryData container) throws Throwable { + ContainerId containerId = container.getContainerId(); + Path nodeCreatePath = + new Path(containerHistoryDataRoot, containerId.toString()); + LOG.info("Writing history data for Container: " + containerId + + " at: " + nodeCreatePath); + assert container instanceof ContainerHistoryDataPBImpl; + ContainerHistoryDataPBImpl containerPBImpl = + (ContainerHistoryDataPBImpl) container; + byte[] containerHistoryData = containerPBImpl.getProto().toByteArray(); + try { + writeFile(nodeCreatePath, containerHistoryData); + } catch (IOException e) { + LOG.error("Error storing history data for Container: " + + containerId, e); + throw e; + } + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws Throwable { + FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appHistoryDataRoot, childNodeName); + ApplicationId applicationId = + ConverterUtils.toApplicationId(childNodeName); + if (!applicatonId.equals(appId)) { + continue; + } + LOG.info("Loading application history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + return new ApplicationHistoryDataPBImpl( + ApplicationHistoryDataProto.parseFrom(childData)); + } catch (IOException e) { + LOG.error("Error loading application history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public Map getAllApplications() { + FileStatus[] childNodes = fs.listStatus(appHistoryDataRoot); + Map applications = + new HashMap(); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appHistoryDataRoot, childNodeName); + ApplicationId applicationId = + ConverterUtils.toApplicationId(childNodeName); + LOG.info("Loading application history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationHistoryData appHistoryData = + new ApplicationHistoryDataPBImpl( + ApplicationHistoryDataProto.parseFrom(childData)); + applications.put(applicationId, appHistoryData); + } catch (IOException e) { + LOG.error("Error loading application history data from: " + + childNodeName, e); + throw e; + } + } + return applications; + } + + @Override + public Map + getApplicationAttempts(ApplicationId applicationId) throws Throwable { + FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot); + Map appAttempts = + new HashMap(); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName); + ApplicationAttemptId applicationAttemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + if (!applicationAttemptId.getApplicationId().equals(applicationId)) { + continue; + } + LOG.info("Loading application attempt history data from node: " + + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationAttemptHistoryData appAttemptHistoryData = + new ApplicationAttemptHistoryDataPBImpl( + ApplicationAttemptHistoryDataProto.parseFrom(childData)); + appAttempts.put(applicationAttemptId, appAttemptHistoryData); + } catch (IOException e) { + LOG.error("Error loading application attempt history data from: " + + childNodeName, e); + throw e; + } + } + return appAttempts; + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws Throwable { + FileStatus[] childNodes = fs.listStatus(appAttemptHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(appAttemptHistoryDataRoot, childNodeName); + ApplicationAttemptId applicationAttemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + if (!applicationAttemptId.equals(appAttemptId)) { + continue; + } + LOG.info("Loading application attempt history data from node: " + + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ApplicationAttemptHistoryData appAttemptHistoryData = + new ApplicationAttemptHistoryDataPBImpl( + ApplicationAttemptHistoryDataProto.parseFrom(childData)); + return appAttemptHistoryData; + } catch (IOException e) { + LOG.error("Error loading application attempt history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public ContainerHistoryData getContainer( + ContainerId containerId) throws Throwable { + FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(containerHistoryDataRoot, childNodeName); + ContainerId contId = ConverterUtils.toContainerId(childNodeName); + if (!containerId.equals(contId)) { + continue; + } + LOG.info("Loading container history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ContainerHistoryData containerHistoryData = + new ContainerHistoryDataPBImpl( + ContainerHistoryDataProto.parseFrom(childData)); + return containerHistoryData; + } catch (IOException e) { + LOG.error("Error loading container history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + @Override + public ContainerHistoryData getAMContainer( + ApplicationAttemptId appAttemptId) throws Throwable { + FileStatus[] childNodes = fs.listStatus(containerHistoryDataRoot); + for (FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = new Path(containerHistoryDataRoot, childNodeName); + ContainerId containerId = ConverterUtils.toContainerId(childNodeName); + if (!containerId.getApplicationAttemptId().equals(appAttemptId)) { + continue; + } + LOG.info("Loading container history data from node: " + childNodeName); + try { + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ContainerHistoryData containerHistoryData = + new ContainerHistoryDataPBImpl( + ContainerHistoryDataProto.parseFrom(childData)); + return containerHistoryData; + } catch (IOException e) { + LOG.error("Error loading container history data from: " + + childNodeName, e); + throw e; + } + } + return null; + } + + private byte[] readFile(Path inputPath, long len) throws IOException { + FSDataInputStream fsIn = fs.open(inputPath); + // state data will not be that "long" + byte[] data = new byte[(int)len]; + fsIn.readFully(data); + fsIn.close(); + return data; + } + + private void writeFile(Path outputPath, byte[] data) throws IOException { + FSDataOutputStream fsOut = fs.create(outputPath, false); + fsOut.write(data); + fsOut.close(); + } + +}