diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java index ccbe6f1..9c27076 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java @@ -36,6 +36,10 @@ @Private @Unstable + public static final String containerIdStrPrefix = "container_"; + + @Private + @Unstable public static ContainerId newInstance(ApplicationAttemptId appAttemptId, int containerId) { ContainerId id = Records.newRecord(ContainerId.class); @@ -137,7 +141,7 @@ public int compareTo(ContainerId other) { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("container_"); + sb.append(containerIdStrPrefix); ApplicationId appId = getApplicationAttemptId().getApplicationId(); sb.append(appId.getClusterTimestamp()).append("_"); sb.append(ApplicationId.appIdFormat.get().format(appId.getId())) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index febf095..9e950b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -751,6 +751,19 @@ YARN_PREFIX + "app.container.log.filesize"; //////////////////////////////// + // AHS Configs + //////////////////////////////// + + public static final String AHS_PREFIX = YARN_PREFIX + "ahs."; + + /** URI for FileSystemApplicationHistoryStore */ + public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri"; + + /** T-file compression types used to compress history data.*/ + public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type"; + public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none"; + + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ab8d50a..ee1eb30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -755,6 +755,23 @@ $HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/* + + + + URI pointing to the location of the FileSystem path where + the history will be persisted. This must be supplied when using + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + as the value for yarn.resourcemanager.ahs.writer.class + yarn.ahs.fs-history-store.uri + ${hadoop.tmp.dir}/yarn/system/ahstore + + + + T-file compression types used to compress history data. + yarn.ahs.fs-history-store.compression-type + none + + The interval of the yarn client's querying application state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index 33350b4..e3d9bd9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -40,5 +40,11 @@ org.apache.hadoop hadoop-yarn-server-common + + org.apache.hadoop + hadoop-hdfs + test-jar + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..ca778a8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -0,0 +1,847 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * File system implementation of {@link ApplicationHistoryStore}. In this + * implementation, one application will have just one file in the file system, + * which contains all the history data of one application, and its attempts and + * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to + * be invoked first when writing any history data of one application and it will + * open a file, while {@link #applicationFinished(ApplicationFinishData)} is + * supposed to be last writing operation and will close the file. + */ +@Private +@Unstable +public class FileSystemApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { + + private static final Log LOG = LogFactory + .getLog(FileSystemApplicationHistoryStore.class); + + private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + private static final int MIN_BLOCK_SIZE = 256 * 1024; + private static final String START_DATA_SUFFIX = "_start"; + private static final String FINISH_DATA_SUFFIX = "_finish"; + + private FileSystem fs; + private Path rootDirPath; + + private ConcurrentMap outstandingWriters = + new ConcurrentHashMap(); + + public FileSystemApplicationHistoryStore() { + super(FileSystemApplicationHistoryStore.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = new Path( + conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + try { + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(rootDirPath); + } catch (IOException e) { + LOG.error("Error when initializing FileSystemHistoryStorage", e); + throw e; + } + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + try { + boolean flag = false; + for (Entry entry : outstandingWriters + .entrySet()) { + try { + entry.getValue().close(); + LOG.info("Closed history file of application " + entry.getKey()); + } catch (IOException e) { + LOG.error("Error when closing history file of application " + + entry.getKey()); + flag = true; + } + } + if (flag) { + throw new IOException("Error when closing history files"); + } + outstandingWriters.clear(); + } finally { + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + LOG.error("Error when stopping FileSystemHistoryStorage", e); + throw e; + } + } + } + super.serviceStop(); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + HistoryFileReader hfReader = getHistoryFileReader(appId); + if (hfReader == null) { + return null; + } + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationHistoryData historyData = + ApplicationHistoryData.newInstance( + appId, null, null, null, null, Long.MIN_VALUE, Long.MIN_VALUE, + Long.MAX_VALUE, null, FinalApplicationStatus.UNDEFINED); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.equals(appId + START_DATA_SUFFIX)) { + ApplicationStartData startData = + new ApplicationStartDataPBImpl( + ApplicationStartDataProto.parseFrom(entry.value)); + mergeApplicationHistoryData(historyData, startData, null); + readStartData = false; + } else if (entry.key.equals(appId + FINISH_DATA_SUFFIX)) { + ApplicationFinishData finishData = + new ApplicationFinishDataPBImpl( + ApplicationFinishDataProto.parseFrom(entry.value)); + mergeApplicationHistoryData(historyData, null, finishData); + readFinishData = true; + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application " + appId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application " + appId); + } + LOG.info("Completed reading history information of application " + appId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application " + appId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public Map getAllApplications() + throws IOException { + Map historyDataMap = + new HashMap(); + FileStatus[] files = fs.listStatus(rootDirPath); + for (FileStatus file : files) { + ApplicationId appId = + ConverterUtils.toApplicationId(file.getPath().getName()); + try { + ApplicationHistoryData historyData = getApplication(appId); + historyDataMap.put(appId, historyData); + } catch (IOException e) { + // Eat the exception not to disturb the getting the next + // ApplicationHistoryData + historyDataMap.put(appId, null); + } + } + return historyDataMap; + } + + @Override + public Map + getApplicationAttempts(ApplicationId appId) throws IOException { + Map historyDataMap = + new HashMap(); + Map> startFinshDataMap = + new HashMap>(); + HistoryFileReader hfReader = getHistoryFileReader(appId); + if (hfReader == null) { + return historyDataMap; + } + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + if (entry.key.endsWith(START_DATA_SUFFIX)) { + retriveStartFinishData(appId, entry, startFinshDataMap, true); + } else { + retriveStartFinishData(appId, entry, startFinshDataMap, false); + } + } + } + LOG.info("Completed reading history information of all application" + + " attempts of application " + appId); + } catch (IOException e) { + LOG.info("Error when reading history information of some application" + + " attempts of application " + appId); + } finally { + hfReader.close(); + } + for (Map.Entry> entry : startFinshDataMap + .entrySet()) { + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance( + entry.getKey(), null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED); + mergeApplicationAttemptHistoryData(historyData, + entry.getValue().startData, entry.getValue().finishDtata); + historyDataMap.put(entry.getKey(), historyData); + } + return historyDataMap; + } + + private + void + retriveStartFinishData( + ApplicationId appId, + HistoryFileReader.Entry entry, + Map> startFinshDataMap, + boolean start) throws IOException { + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(entry.key.substring( + 0, + entry.key.length() + - (start ? START_DATA_SUFFIX.length() : FINISH_DATA_SUFFIX + .length()))); + if (appAttemptId.getApplicationId().equals(appId)) { + StartFinishDataPair pair = + startFinshDataMap.get(appAttemptId); + if (pair == null) { + pair = + new StartFinishDataPair(); + startFinshDataMap.put(appAttemptId, pair); + } + if (start) { + ApplicationAttemptStartData startData = + new ApplicationAttemptStartDataPBImpl( + ApplicationAttemptStartDataProto.parseFrom(entry.value)); + pair.startData = startData; + } else { + ApplicationAttemptFinishData finishData = + new ApplicationAttemptFinishDataPBImpl( + ApplicationAttemptFinishDataProto.parseFrom(entry.value)); + pair.finishDtata = finishData; + } + } + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + if (hfReader == null) { + return null; + } + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance( + appAttemptId, null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.equals(appAttemptId + START_DATA_SUFFIX)) { + ApplicationAttemptStartData startData = + new ApplicationAttemptStartDataPBImpl( + ApplicationAttemptStartDataProto.parseFrom(entry.value)); + mergeApplicationAttemptHistoryData(historyData, startData, null); + readStartData = true; + } else if (entry.key.equals(appAttemptId + FINISH_DATA_SUFFIX)) { + ApplicationAttemptFinishData finishData = + new ApplicationAttemptFinishDataPBImpl( + ApplicationAttemptFinishDataProto.parseFrom(entry.value)); + mergeApplicationAttemptHistoryData(historyData, null, finishData); + readFinishData = true; + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application attempt " + + appAttemptId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application attempt " + + appAttemptId); + } + LOG.info("Completed reading history information of application attempt " + + appAttemptId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application attempt" + + appAttemptId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(containerId.getApplicationAttemptId() + .getApplicationId()); + if (hfReader == null) { + return null; + } + try { + boolean readStartData = false; + boolean readFinishData = false; + ContainerHistoryData historyData = + ContainerHistoryData.newInstance(containerId, null, null, null, + Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.equals(containerId + START_DATA_SUFFIX)) { + ContainerStartData startData = + new ContainerStartDataPBImpl( + ContainerStartDataProto.parseFrom(entry.value)); + mergeContainerHistoryData(historyData, startData, null); + readStartData = false; + } else if (entry.key.equals(containerId + FINISH_DATA_SUFFIX)) { + ContainerFinishData finishData = + new ContainerFinishDataPBImpl( + ContainerFinishDataProto.parseFrom(entry.value)); + mergeContainerHistoryData(historyData, null, finishData); + readFinishData = true; + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for container " + containerId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for container " + containerId); + } + LOG.info("Completed reading history information of container " + + containerId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of container " + containerId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + ApplicationAttemptHistoryData attemptHistoryData = + getApplicationAttempt(appAttemptId); + if (attemptHistoryData == null + || attemptHistoryData.getMasterContainerId() == null) { + return null; + } + return getContainer(attemptHistoryData.getMasterContainerId()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map historyDataMap = + new HashMap(); + Map> startFinshDataMap = + new HashMap>(); + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + if (hfReader == null) { + return historyDataMap; + } + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.startsWith(ContainerId.containerIdStrPrefix)) { + if (entry.key.endsWith(START_DATA_SUFFIX)) { + retriveStartFinishData(appAttemptId, entry, startFinshDataMap, true); + } else { + retriveStartFinishData(appAttemptId, entry, startFinshDataMap, + false); + } + } + } + LOG.info("Completed reading history information of all conatiners" + + " of application attempt " + appAttemptId); + } catch (IOException e) { + LOG.info("Error when reading history information of some containers" + + " of application attempt " + appAttemptId); + } finally { + hfReader.close(); + } + for (Map.Entry> entry : startFinshDataMap + .entrySet()) { + ContainerHistoryData historyData = + ContainerHistoryData.newInstance(entry.getKey(), null, null, null, + Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE); + mergeContainerHistoryData(historyData, entry.getValue().startData, + entry.getValue().finishDtata); + historyDataMap.put(entry.getKey(), historyData); + } + return historyDataMap; + } + + private + void + retriveStartFinishData( + ApplicationAttemptId appAttemptId, + HistoryFileReader.Entry entry, + Map> startFinshDataMap, + boolean start) throws IOException { + ContainerId containerId = + ConverterUtils.toContainerId(entry.key.substring( + 0, + entry.key.length() + - (start ? START_DATA_SUFFIX.length() : FINISH_DATA_SUFFIX + .length()))); + if (containerId.getApplicationAttemptId().equals(appAttemptId)) { + StartFinishDataPair pair = + startFinshDataMap.get(containerId); + if (pair == null) { + pair = + new StartFinishDataPair(); + startFinshDataMap.put(containerId, pair); + } + if (start) { + ContainerStartData startData = new ContainerStartDataPBImpl( + ContainerStartDataProto.parseFrom(entry.value)); + pair.startData = startData; + } else { + ContainerFinishData finishData = new ContainerFinishDataPBImpl( + ContainerFinishDataProto.parseFrom(entry.value)); + pair.finishDtata = finishData; + } + } + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + HistoryFileWriter hfWriter = + outstandingWriters.get(appStart.getApplicationId()); + if (hfWriter == null) { + Path applicationHistoryFile = + new Path(rootDirPath, appStart.getApplicationId().toString()); + try { + hfWriter = new HistoryFileWriter(applicationHistoryFile); + LOG.info("Opened history file of application " + + appStart.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when openning history file of application " + + appStart.getApplicationId()); + throw e; + } + outstandingWriters.put(appStart.getApplicationId(), hfWriter); + } else { + throw new IOException("History file of application " + + appStart.getApplicationId() + " is already opened"); + } + assert appStart instanceof ApplicationStartDataPBImpl; + try { + hfWriter.writeHistoryData(appStart.getApplicationId().toString() + + START_DATA_SUFFIX, ((ApplicationStartDataPBImpl) appStart) + .getProto().toByteArray()); + LOG.info("Start information of application " + + appStart.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application " + + appStart.getApplicationId()); + throw e; + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appFinish.getApplicationId()); + assert appFinish instanceof ApplicationFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + appFinish.getApplicationId() + FINISH_DATA_SUFFIX, + ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray()); + LOG.info("Finish information of application " + + appFinish.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application " + + appFinish.getApplicationId()); + throw e; + } + // Not put close() in finally block in case callers want to retry writing + // the data. On the other hand, the file will anyway be close when the + // store is stopped. + try { + hfWriter.close(); + LOG.info("Closed history file of application " + + appFinish.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when closing history file of application " + + appFinish.getApplicationId()); + throw e; + } + outstandingWriters.remove(appFinish.getApplicationId()); + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptStart.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl; + try { + hfWriter.writeHistoryData( + appAttemptStart.getApplicationAttemptId() + START_DATA_SUFFIX, + ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto() + .toByteArray()); + LOG.info("Start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application attempt " + + appAttemptStart.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + appAttemptFinish.getApplicationAttemptId() + FINISH_DATA_SUFFIX, + ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto() + .toByteArray()); + LOG.info("Finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerStart.getContainerId() + .getApplicationAttemptId() + .getApplicationId()); + assert containerStart instanceof ContainerStartDataPBImpl; + try { + hfWriter.writeHistoryData( + containerStart.getContainerId() + START_DATA_SUFFIX, + ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray()); + LOG.info("Start information of container " + + containerStart.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of container " + + containerStart.getContainerId()); + throw e; + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerFinish.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerFinish instanceof ContainerFinishDataPBImpl; + try { + hfWriter.writeHistoryData( + containerFinish.getContainerId() + FINISH_DATA_SUFFIX, + ((ContainerFinishDataPBImpl) containerFinish).getProto() + .toByteArray()); + LOG.info("Finish information of container " + + containerFinish.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of container " + + containerFinish.getContainerId()); + } + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, + ApplicationStartData startData, + ApplicationFinishData finishData) { + if (startData != null) { + historyData.setApplicationName(startData.getApplicationName()); + historyData.setApplicationType(startData.getApplicationType()); + historyData.setQueue(startData.getQueue()); + historyData.setUser(startData.getUser()); + historyData.setSubmitTime(startData.getSubmitTime()); + historyData.setStartTime(startData.getStartTime()); + } + if (finishData != null) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + } + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptStartData startData, + ApplicationAttemptFinishData finishData) { + if (startData != null) { + historyData.setHost(startData.getHost()); + historyData.setRPCPort(startData.getRPCPort()); + historyData.setMasterContainerId(startData.getMasterContainerId()); + } + if (finishData != null) { + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setTrackingURL(finishData.getTrackingURL()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + } + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerStartData startData, + ContainerFinishData finishData) { + if (startData != null) { + historyData.setAllocatedResource(startData.getAllocatedResource()); + historyData.setAssignedNode(startData.getAssignedNode()); + historyData.setPriority(startData.getPriority()); + historyData.setStartTime(startData.getStartTime()); + } + if (finishData != null) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setLogURL(finishData.getLogURL()); + historyData.setContainerExitStatus(finishData + .getContainerExitStatus()); + } + } + + private HistoryFileWriter getHistoryFileWriter(ApplicationId appId) + throws IOException { + HistoryFileWriter hfWriter = outstandingWriters.get(appId); + if (hfWriter == null) { + throw new IOException("History file of application " + appId + + " is not opened"); + } + return hfWriter; + } + + private HistoryFileReader getHistoryFileReader(ApplicationId appId) + throws IOException { + Path applicationHistoryFile = new Path(rootDirPath, appId.toString()); + if (!fs.exists(applicationHistoryFile)) { + return null; + } + // The history file is still under writing + if (outstandingWriters.containsKey(appId)) { + return null; + } + return new HistoryFileReader(applicationHistoryFile); + } + + private class HistoryFileReader { + + private class Entry { + + private String key; + private byte[] value; + + public Entry(String key, byte[] value) { + this.key = key; + this.value = value; + } + } + + private FSDataInputStream fsdis; + private TFile.Reader reader; + private TFile.Reader.Scanner scanner; + + public HistoryFileReader(Path historyFile) throws IOException { + FSDataInputStream fsdis = fs.open(historyFile); + reader = + new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(), + getConfig()); + reset(); + } + + public boolean hasNext() { + return !scanner.atEnd(); + } + + public Entry next() throws IOException { + TFile.Reader.Scanner.Entry entry = scanner.entry(); + DataInputStream dis = entry.getKeyStream(); + String key = dis.readUTF(); + dis = entry.getValueStream(); + byte[] value = new byte[entry.getValueLength()]; + dis.read(value); + scanner.advance(); + return new Entry(key, value); + } + + public void reset() throws IOException { + if (scanner != null) { + scanner.close(); + } + scanner = reader.createScanner(); + } + + public void close() throws IOException { + try { + try { + if (scanner != null) { + scanner.close(); + } + } finally { + if (reader != null) { + reader.close(); + } + } + } finally { + if (fsdis != null) { + fsdis.close(); + } + } + } + } + + private class HistoryFileWriter { + + private FSDataOutputStream fsdos; + private TFile.Writer writer; + + public HistoryFileWriter(Path historyFile) + throws IOException { + if (fs.exists(historyFile)) { + fsdos = fs.append(historyFile); + } else { + fsdos = fs.create(historyFile); + } + writer = + new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( + YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE), + null, getConfig()); + } + + public synchronized void close() throws IOException { + try { + if (writer != null) { + writer.close(); + } + } finally { + if (fsdos != null) { + fsdos.close(); + } + } + } + + public synchronized void writeHistoryData(String key, byte[] value) + throws IOException { + DataOutputStream dos = null; + try { + dos = writer.prepareAppendKey(-1); + dos.writeUTF(key); + } finally { + if (dos != null) { + dos.close(); + } + } + try { + dos = writer.prepareAppendValue(value.length); + dos.write(value); + } finally { + if (dos != null) { + dos.close(); + } + } + } + + } + + private static class StartFinishDataPair { + + private S startData; + private F finishDtata; + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..4777400 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFileSystemApplicationHistoryStore extends + TestApplicationHistoryStore { + + private static MiniDFSCluster cluster; + private static Path fsWorkingPath; + private static Configuration conf; + private static FileSystemApplicationHistoryStore fsstore; + + @BeforeClass + public static void classSetup() throws Exception { + cluster = + new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(1) + .build(); + fsWorkingPath = new Path(new Path(cluster.getURI()), new Path("/Test")); + conf = new Configuration(); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, fsWorkingPath.toString()); + fsstore = new FileSystemApplicationHistoryStore(); + fsstore.init(conf); + fsstore.start(); + } + + @AfterClass + public static void classTearDown() throws Exception { + fsstore.stop(); + cluster.getFileSystem().delete(fsWorkingPath, true); + cluster.shutdown(); + } + + @Before + public void setup() { + store = fsstore; + } + + // The order of the test cases matters + @Test + public void testWriteHistoryData() throws IOException { + int num = 5; + // write application start data + for (int i = 1; i <= num; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + writeApplicationStartData(appId); + } + // write application attempt history data + ApplicationId appId = ApplicationId.newInstance(0, 1); + for (int i = 1; i <= num; ++i) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, i); + writeApplicationAttemptStartData(appAttemptId); + writeApplicationAttemptFinishData(appAttemptId); + } + // write container history data + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + for (int i = 1; i <= num; ++i) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, i); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + } + // write application finish data + for (int i = 1; i <= num; ++i) { + appId = ApplicationId.newInstance(0, i); + writeApplicationFinishData(appId); + } + } + + @Test + public void testReadHistoryData() throws IOException { + int num = 5; + // read application history data + Assert.assertEquals(num, store.getAllApplications().size()); + for (int i = 1; i <= num; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + ApplicationHistoryData data = store.getApplication(appId); + Assert.assertNotNull(data); + Assert.assertEquals(appId.toString(), data.getApplicationName()); + Assert.assertEquals(appId.toString(), data.getDiagnosticsInfo()); + } + // read application attempt history data + ApplicationId appId = ApplicationId.newInstance(0, 1); + Assert.assertEquals( + num, store.getApplicationAttempts(appId).size()); + for (int i = 1; i <= num; ++i) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, i); + ApplicationAttemptHistoryData data = + store.getApplicationAttempt(appAttemptId); + Assert.assertNotNull(data); + Assert.assertEquals(appAttemptId.toString(), data.getHost()); + Assert.assertEquals(appAttemptId.toString(), data.getDiagnosticsInfo()); + } + // read container history data + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + Assert.assertEquals( + num, store.getContainers(appAttemptId).size()); + for (int i = 1; i <= num; ++i) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, i); + ContainerHistoryData data = store.getContainer(containerId); + Assert.assertNotNull(data); + Assert.assertEquals(Priority.newInstance(containerId.getId()), + data.getPriority()); + Assert.assertEquals(containerId.toString(), data.getDiagnosticsInfo()); + } + ContainerHistoryData masterContainer = + store.getAMContainer(appAttemptId); + Assert.assertNotNull(masterContainer); + Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1), + masterContainer.getContainerId()); + } + + @Test + public void testWriteAfterApplicationFinish() throws IOException { + ApplicationId appId = + ApplicationId.newInstance(0, store.getAllApplications().size() + 1); + writeApplicationStartData(appId); + writeApplicationFinishData(appId); + // write application attempt history data + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + try { + writeApplicationAttemptStartData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + try { + writeApplicationAttemptFinishData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + // write container history data + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + try { + writeContainerStartData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + try { + writeContainerFinishData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is not opened")); + } + } + +}