diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fed2574..49d1554 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -869,6 +869,9 @@ public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type"; public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none"; + /** AHS store class */ + public static final String AHS_STORE = AHS_PREFIX + "store.class"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b88a9c4..bdb89cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -894,6 +894,13 @@ yarn.ahs.fs-history-store.compression-type none + + + Store class name for history store, defaulting to file + system store + yarn.ahs.store.class + org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java new file mode 100644 index 0000000..e271bab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ApplicationHistoryManager { + /** + * This method returns Application {@link ApplicationReport} for the specified + * {@link ApplicationId}. + * + * @return {@link ApplicationReport} for the ApplicationId. + * @throws {@link IOException} + */ + @Public + @Unstable + ApplicationReport getApplication(ApplicationId appId) throws IOException; + + /** + * This method returns all Application {@link ApplicationReport}s + * + * @return map {@link ApplicationId, @link ApplicationReport}s. + * @throws {@link IOException} + */ + @Public + @Unstable + Map getAllApplications() throws IOException; + + /** + * Application can have multiple application attempts + * {@link ApplicationAttemptReport}. This method returns the all + * {@link ApplicationAttemptReport}s for the Application. + * + * @return all {@link ApplicationAttemptReport}s for the Application. + * @throws {@link IOException} + */ + @Public + @Unstable + Map getApplicationAttempts( + ApplicationId appId) throws IOException; + + /** + * This method returns {@link ApplicationAttemptReport} for specified + * {@link ApplicationId}. + * + * @param {@link ApplicationAttemptId} + * @return {@link ApplicationAttemptReport} for ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException; + + /** + * This method returns {@link ContainerReport} for specified + * {@link ContainerId}. + * + * @param {@link ContainerId} + * @return {@link Container} for ContainerId + * @throws {@link IOException} + */ + @Public + @Unstable + ContainerReport getContainer(ContainerId containerId) throws IOException; + + /** + * This method returns {@link ContainerReport} for specified + * {@link ApplicationAttemptId}. + * + * @param {@link ApplicationAttemptId} + * @return {@link Container} for ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException; + + /** + * This method returns Map{@link ContainerId,@link ContainerReport} for + * specified {@link ApplicationAttemptId}. + * + * @param {@link ApplicationAttemptId} + * @return Map{@link ContainerId, @link ContainerReport} for + * ApplicationAttemptId + * @throws {@link IOException} + */ + @Public + @Unstable + Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java new file mode 100644 index 0000000..6f13e74 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; + +import com.google.common.annotations.VisibleForTesting; + +public class ApplicationHistoryManagerImpl extends AbstractService implements + ApplicationHistoryManager { + private static final Log LOG = LogFactory + .getLog(ApplicationHistoryManagerImpl.class); + private static final String UNAVAILABLE = "N/A"; + + private ApplicationHistoryStore historyStore; + + public ApplicationHistoryManagerImpl() { + super(ApplicationHistoryManagerImpl.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + LOG.info("ApplicationHistory Init"); + historyStore = ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.AHS_STORE, FileSystemApplicationHistoryStore.class, + ApplicationHistoryStore.class), conf); + historyStore.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting ApplicationHistory"); + historyStore.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping ApplicationHistory"); + historyStore.stop(); + super.serviceStop(); + } + + @Override + public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return convertToContainerReport(historyStore.getAMContainer(appAttemptId)); + } + + @Override + public Map getAllApplications() + throws IOException { + Map histData = historyStore + .getAllApplications(); + HashMap applicationsReport = new HashMap(); + for (ApplicationId appId : histData.keySet()) { + applicationsReport.put(appId, convertToApplicationReport(histData + .get(appId))); + } + return applicationsReport; + } + + @Override + public ApplicationReport getApplication(ApplicationId appId) + throws IOException { + return convertToApplicationReport(historyStore.getApplication(appId)); + } + + private ApplicationReport convertToApplicationReport( + ApplicationHistoryData appHistory) throws IOException { + ApplicationAttemptId currentApplicationAttemptId = null; + String trackingUrl = UNAVAILABLE; + String host = UNAVAILABLE; + int rpcPort = -1; + + ApplicationAttemptHistoryData lastAttempt = getLastAttempt(appHistory + .getApplicationId()); + if (lastAttempt != null) { + currentApplicationAttemptId = lastAttempt.getApplicationAttemptId(); + trackingUrl = lastAttempt.getTrackingURL(); + host = lastAttempt.getHost(); + rpcPort = lastAttempt.getRPCPort(); + } + return ApplicationReport.newInstance(appHistory.getApplicationId(), + currentApplicationAttemptId, appHistory.getUser(), appHistory + .getQueue(), appHistory.getApplicationName(), host, rpcPort, null, + appHistory.getYarnApplicationState(), appHistory.getDiagnosticsInfo(), + trackingUrl, appHistory.getStartTime(), appHistory.getFinishTime(), + appHistory.getFinalApplicationStatus(), null, "", 100, appHistory + .getApplicationType(), null); + } + + private ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId) + throws IOException { + Map attempts = historyStore + .getApplicationAttempts(appId); + ApplicationAttemptId prevMaxAttemptId = null; + for (ApplicationAttemptId attemptId : attempts.keySet()) { + if (prevMaxAttemptId == null) { + prevMaxAttemptId = attemptId; + } else { + if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) { + prevMaxAttemptId = attemptId; + } + } + } + return attempts.get(prevMaxAttemptId); + } + + private ApplicationAttemptReport convertToApplicationAttemptReport( + ApplicationAttemptHistoryData appAttemptHistory) { + return ApplicationAttemptReport.newInstance(appAttemptHistory + .getApplicationAttemptId(), appAttemptHistory.getHost(), + appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), + appAttemptHistory.getDiagnosticsInfo(), null, appAttemptHistory + .getMasterContainerId()); + } + + @Override + public ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return convertToApplicationAttemptReport(historyStore + .getApplicationAttempt(appAttemptId)); + } + + @Override + public Map getApplicationAttempts( + ApplicationId appId) throws IOException { + Map histData = historyStore + .getApplicationAttempts(appId); + HashMap applicationAttemptsReport = new HashMap(); + for (ApplicationAttemptId appAttemptId : histData.keySet()) { + applicationAttemptsReport.put(appAttemptId, + convertToApplicationAttemptReport(histData.get(appAttemptId))); + } + return applicationAttemptsReport; + } + + @Override + public ContainerReport getContainer(ContainerId containerId) + throws IOException { + return convertToContainerReport(historyStore.getContainer(containerId)); + } + + private ContainerReport convertToContainerReport( + ContainerHistoryData containerHistory) { + return ContainerReport.newInstance(containerHistory.getContainerId(), + containerHistory.getAllocatedResource(), containerHistory + .getAssignedNode(), containerHistory.getPriority(), + containerHistory.getStartTime(), containerHistory.getFinishTime(), + containerHistory.getDiagnosticsInfo(), containerHistory.getLogURL(), + containerHistory.getContainerExitStatus(), containerHistory + .getContainerState()); + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map histData = historyStore + .getContainers(appAttemptId); + HashMap containersReport = new HashMap(); + for (ContainerId container : histData.keySet()) { + containersReport.put(container, convertToContainerReport(histData + .get(container))); + } + return containersReport; + } + + @Private + @VisibleForTesting + public ApplicationHistoryStore getHistoryStore() { + return this.historyStore; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerImpl.java new file mode 100644 index 0000000..15791f3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestApplicationHistoryManagerImpl extends + ApplicationHistoryStoreTestUtils { + ApplicationHistoryManagerImpl applicationHistoryManagerImpl = null; + + @Before + public void setup() throws Exception { + Configuration config = new Configuration(); + config.setClass(YarnConfiguration.AHS_STORE, + MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); + applicationHistoryManagerImpl = new ApplicationHistoryManagerImpl(); + applicationHistoryManagerImpl.init(config); + applicationHistoryManagerImpl.start(); + store = applicationHistoryManagerImpl.getHistoryStore(); + } + + @After + public void tearDown() throws Exception { + applicationHistoryManagerImpl.stop(); + } + + @Test + public void testApplicationReport() throws IOException, YarnException { + ApplicationId appId = null; + appId = ApplicationId.newInstance(0, 1); + writeApplicationStartData(appId); + writeApplicationFinishData(appId); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 1); + writeApplicationAttemptStartData(appAttemptId); + writeApplicationAttemptFinishData(appAttemptId); + ApplicationReport appReport = applicationHistoryManagerImpl + .getApplication(appId); + Assert.assertNotNull(appReport); + Assert.assertEquals(appId, appReport.getApplicationId()); + Assert.assertEquals(appAttemptId, appReport + .getCurrentApplicationAttemptId()); + Assert.assertEquals(appAttemptId.toString(), appReport.getHost()); + Assert.assertEquals("test type", appReport.getApplicationType().toString()); + Assert.assertEquals("test queue", appReport.getQueue().toString()); + } +}