diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fed2574..a2b9e18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -869,6 +869,9 @@ public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type"; public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none"; + /** AHS STORAGE CLASS */ + public static final String AHS_STORE = AHS_PREFIX + "store.class"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b88a9c4..b2a5a51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -894,6 +894,13 @@ yarn.ahs.fs-history-store.compression-type none + + + Store Class Name for History Store, defaulting to file + system store + yarn.ahs.store.class + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistory.java new file mode 100644 index 0000000..ad3edc3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistory.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; + +public class ApplicationHistory extends AbstractService implements + ApplicationHistoryContext { + private static final Log LOG = LogFactory.getLog(ApplicationHistory.class); + + private ApplicationHistoryStore historyStore; + + public ApplicationHistory() { + super(ApplicationHistory.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + LOG.info("ApplicationHistory Init"); + historyStore = ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.AHS_STORE, FileSystemApplicationHistoryStore.class, + ApplicationHistoryStore.class), conf); + historyStore.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting ApplicationHistory"); + if (historyStore instanceof Service) { + ((Service) historyStore).start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping ApplicationHistory"); + if (historyStore != null && historyStore instanceof Service) { + ((Service) historyStore).stop(); + } + super.serviceStop(); + } + + @Override + public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return convertToContainerReport(historyStore.getAMContainer(appAttemptId)); + } + + @Override + public Map getAllApplications() + throws IOException { + Map histData = + historyStore.getAllApplications(); + HashMap applicationsReport = + new HashMap(); + for (ApplicationId appId : histData.keySet()) { + applicationsReport.put(appId, + convertToApplicationReport(histData.get(appId))); + } + return applicationsReport; + } + + @Override + public ApplicationReport getApplication(ApplicationId appId) + throws IOException { + return convertToApplicationReport(historyStore.getApplication(appId)); + } + + public ApplicationReport convertToApplicationReport( + ApplicationHistoryData appHistory) throws IOException { + ApplicationAttemptHistoryData lastAttempt = getLastAttempt(appHistory + .getApplicationId()); + return ApplicationReport.newInstance(appHistory.getApplicationId(), + lastAttempt.getApplicationAttemptId(), appHistory.getUser(), appHistory + .getQueue(), appHistory.getApplicationName(), + lastAttempt.getHost(), lastAttempt.getRPCPort(), null, appHistory + .getYarnApplicationState(), appHistory.getDiagnosticsInfo(), + lastAttempt.getTrackingURL(), appHistory.getStartTime(), appHistory + .getFinishTime(), appHistory.getFinalApplicationStatus(), null, "", + 100, appHistory.getApplicationType(), null); + } + + public ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId) + throws IOException { + Map attempts = historyStore + .getApplicationAttempts(appId); + ApplicationAttemptId prevMaxAttemptId = null; + for (ApplicationAttemptId attemptId : attempts.keySet()) { + if (prevMaxAttemptId == null) { + prevMaxAttemptId = attemptId; + } else { + if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) { + prevMaxAttemptId = attemptId; + } + } + } + return attempts.get(prevMaxAttemptId); + } + + public ApplicationAttemptReport convertToApplicationAttemptReport( + ApplicationAttemptHistoryData appAttemptHistory) { + return ApplicationAttemptReport.newInstance(appAttemptHistory + .getApplicationAttemptId(), appAttemptHistory.getHost(), + appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), + appAttemptHistory.getDiagnosticsInfo(), null, appAttemptHistory + .getMasterContainerId()); + } + + @Override + public ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return convertToApplicationAttemptReport(historyStore + .getApplicationAttempt(appAttemptId)); + } + + @Override + public Map getApplicationAttempts( + ApplicationId appId) throws IOException { + Map histData = + historyStore.getApplicationAttempts(appId); + HashMap applicationAttemptsReport = + new HashMap(); + for (ApplicationAttemptId appAttemptId : histData + .keySet()) { + applicationAttemptsReport.put(appAttemptId, + convertToApplicationAttemptReport(histData + .get(appAttemptId))); + } + return applicationAttemptsReport; + } + + @Override + public ContainerReport getContainer(ContainerId containerId) + throws IOException { + return convertToContainerReport(historyStore.getContainer(containerId)); + } + + public ContainerReport convertToContainerReport( + ContainerHistoryData containerHistory) { + return ContainerReport.newInstance(containerHistory.getContainerId(), + containerHistory.getAllocatedResource(), containerHistory + .getAssignedNode(), containerHistory.getPriority(), + containerHistory.getStartTime(), containerHistory.getFinishTime(), + containerHistory.getDiagnosticsInfo(), containerHistory.getLogURL(), + containerHistory.getContainerExitStatus(), containerHistory + .getContainerState()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map histData = historyStore + .getContainers(appAttemptId); + HashMap containersReport = + new HashMap(); + for (ContainerId container : histData.keySet()) { + containersReport.put(container, convertToContainerReport(histData + .get(container))); + } + return containersReport; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryContext.java new file mode 100644 index 0000000..ac74d39 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryContext.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ApplicationHistoryContext { + /** + * This method returns Application {@link ApplicationReport} for the specified + * {@link ApplicationId}. + * + * @return {@link ApplicationReport} for the ApplicationId. + * @throws {@link IOException} + */ + @Public + @Unstable + ApplicationReport getApplication(ApplicationId appId) throws IOException; + + /** + * This method returns all Application {@link ApplicationReport}s + * + * @return map {@link ApplicationId, @link ApplicationReport}s. + * @throws {@link IOException} + */ + @Public + @Unstable + Map getAllApplications() throws IOException; + + /** + * Application can have multiple application attempts + * {@link ApplicationAttemptReport}. This method returns the all + * {@link ApplicationAttemptReport}s for the Application. + * + * @return all {@link ApplicationAttemptReport}s for the Application. + * @throws {@link IOException} + */ + @Public + @Unstable + Map getApplicationAttempts( + ApplicationId appId) throws IOException; + + /** + * This method returns {@link ApplicationAttemptReport} for specified + * {@link ApplicationId}. + * + * @param {@link ApplicationAttemptId} + * @return {@link ApplicationAttemptReport} for ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException; + + /** + * This method returns {@link ContainerReport} for specified {@link ContainerId}. + * + * @param {@link ContainerId} + * @return {@link Container} for ContainerId + * @throws {@link IOException} + */ + @Public + @Unstable + ContainerReport getContainer(ContainerId containerId) throws IOException; + + /** + * This method returns {@link ContainerReport} for specified + * {@link ApplicationAttemptId}. + * + * @param {@link ApplicationAttemptId} + * @return {@link Container} for ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException; + + /** + * This method returns Map{@link ContainerId,@link ContainerReport} for specified + * {@link ApplicationAttemptId}. + * + * @param {@link ApplicationAttemptId} + * @return Map{@link ContainerId, @link ContainerReport} for ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + Map getContainers(ApplicationAttemptId appAttemptId) + throws IOException; +}