diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java index 0070c35..309ad4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java @@ -19,46 +19,55 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; -public class MemoryApplicationHistoryStore implements ApplicationHistoryStore { - - private static MemoryApplicationHistoryStore memStore = null; +/** + * In-memory implementation of {@link ApplicationHistoryStore}. + * This implementation is for test purpose only. If users improperly instantiate + * it, they may encounter reading and writing history data in different memory + * store. + * + */ +@Private +@Unstable +public class MemoryApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { - private ConcurrentHashMap applicationData = + private final ConcurrentMap applicationData = new ConcurrentHashMap(); - private ConcurrentHashMap> applicationAttemptData = - new ConcurrentHashMap>(); - private ConcurrentHashMap containerData = - new ConcurrentHashMap(); + private final ConcurrentMap> applicationAttemptData = + new ConcurrentHashMap>(); + private final ConcurrentMap> containerData = + new ConcurrentHashMap>(); - private MemoryApplicationHistoryStore() { - } - - public static MemoryApplicationHistoryStore getMemoryStore() { - if (memStore == null) { - memStore = new MemoryApplicationHistoryStore(); - } - return memStore; + public MemoryApplicationHistoryStore() { + super(MemoryApplicationHistoryStore.class.getName()); } @Override public Map getAllApplications() { - Map listApps = - new HashMap(); - for (ApplicationId appId : applicationData.keySet()) { - listApps.put(appId, applicationData.get(appId)); - } - return listApps; + return new HashMap( + applicationData); } @Override @@ -67,111 +76,209 @@ public ApplicationHistoryData getApplication(ApplicationId appId) { } @Override - public Map getApplicationAttempts( - ApplicationId appId) { - Map listAttempts = - null; - ConcurrentHashMap appAttempts = + public Map + getApplicationAttempts( + ApplicationId appId) { + ConcurrentMap subMap = applicationAttemptData.get(appId); - if (appAttempts != null) { - listAttempts = - new HashMap(); - for (ApplicationAttemptId attemptId : appAttempts.keySet()) { - listAttempts.put(attemptId, appAttempts.get(attemptId)); - } + if (subMap == null) { + return Collections.emptyMap(); + } else { + return new HashMap(subMap); } - return listAttempts; } @Override public ApplicationAttemptHistoryData getApplicationAttempt( ApplicationAttemptId appAttemptId) { - ApplicationAttemptHistoryData appAttemptHistoryData = null; - ConcurrentHashMap appAttempts = + ConcurrentMap subMap = applicationAttemptData.get(appAttemptId.getApplicationId()); - if (appAttempts != null) { - appAttemptHistoryData = appAttempts.get(appAttemptId); + if (subMap == null) { + return null; + } else { + return subMap.get(appAttemptId); } - return appAttemptHistoryData; } @Override - public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) { - ContainerHistoryData Container = null; - ConcurrentHashMap appAttempts = - applicationAttemptData.get(appAttemptId.getApplicationId()); - if (appAttempts != null) { - containerData.get(appAttempts.get(appAttemptId).getMasterContainerId()); + public ContainerHistoryData getAMContainer( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptHistoryData appAttempt = + getApplicationAttempt(appAttemptId); + if (appAttempt == null || appAttempt.getMasterContainerId() == null) { + return null; + } else { + return getContainer(appAttempt.getMasterContainerId()); } - return Container; } @Override public ContainerHistoryData getContainer(ContainerId containerId) { - return containerData.get(containerId); + Map subMap = + containerData.get(containerId.getApplicationAttemptId()); + if (subMap == null) { + return null; + } else { + return subMap.get(containerId); + } } @Override - public void writeApplication(ApplicationHistoryData app) throws Throwable { - if (app != null) { - ApplicationHistoryData oldData = - applicationData.putIfAbsent(app.getApplicationId(), app); - if (oldData != null) { - throw new IOException("This application " - + app.getApplicationId().toString() + " is already present."); - } + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + ConcurrentMap subMap = + containerData.get(appAttemptId); + if (subMap == null) { + return Collections.emptyMap(); + } else { + return new HashMap(subMap); } } @Override - public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt) - throws Throwable { - if (appAttempt != null) { - if (applicationAttemptData.containsKey(appAttempt - .getApplicationAttemptId().getApplicationId())) { - ConcurrentHashMap appAttemptmap = - applicationAttemptData.get(appAttempt.getApplicationAttemptId() - .getApplicationId()); - ApplicationAttemptHistoryData oldAppAttempt = - appAttemptmap.putIfAbsent(appAttempt.getApplicationAttemptId(), - appAttempt); - if (oldAppAttempt != null) { - throw new IOException("This application attempt " - + appAttempt.getApplicationAttemptId().toString() - + " already present."); - } - } else { - ConcurrentHashMap appAttemptmap = - new ConcurrentHashMap(); - appAttemptmap.put(appAttempt.getApplicationAttemptId(), appAttempt); - applicationAttemptData.putIfAbsent(appAttempt.getApplicationAttemptId() - .getApplicationId(), appAttemptmap); - } + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + ApplicationHistoryData oldData = + applicationData.putIfAbsent(appStart.getApplicationId(), + ApplicationHistoryData.newInstance( + appStart.getApplicationId(), + appStart.getApplicationName(), + appStart.getApplicationType(), + appStart.getQueue(), + appStart.getUser(), + appStart.getSubmitTime(), + appStart.getStartTime(), + Long.MAX_VALUE, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application " + + appStart.getApplicationId() + " is already stored."); } } @Override - public void writeContainer(ContainerHistoryData container) throws Throwable { - if (container != null) { - ContainerHistoryData oldContainer = - containerData.putIfAbsent(container.getContainerId(), container); - if (oldContainer != null) { - throw new IOException("This container " - + container.getContainerId().toString() + " is already present."); - } + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + ApplicationHistoryData data = + applicationData.get(appFinish.getApplicationId()); + if (data == null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is stored before the start" + + " information."); } + // Make the assumption that YarnApplicationState should not be null if + // the finish information is already recorded + if (data.getYarnApplicationState() != null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is already stored."); + } + data.setFinishTime(appFinish.getFinishTime()); + data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo()); + data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus()); + data.setYarnApplicationState(appFinish.getYarnApplicationState()); } @Override - public Map getContainers( - ApplicationAttemptId appAttemptId) throws IOException { - HashMap containers = - new HashMap(); - for (ContainerId container : containerData.keySet()) { - if (container.getApplicationAttemptId().equals(appAttemptId)) { - containers.put(container, containerData.get(container)); - } + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + ConcurrentMap subMap = + getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData oldData = subMap.putIfAbsent( + appAttemptStart.getApplicationAttemptId(), + ApplicationAttemptHistoryData.newInstance( + appAttemptStart.getApplicationAttemptId(), + appAttemptStart.getHost(), + appAttemptStart.getRPCPort(), + appAttemptStart.getMasterContainerId(), + null, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + + " is already stored."); } - return containers; } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + ConcurrentMap subMap = + getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData data = + subMap.get(appAttemptFinish.getApplicationAttemptId()); + if (data == null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is stored before" + + " the start information."); + } + // Make the assumption that YarnApplicationAttemptState should not be null + // if the finish information is already recorded + if (data.getYarnApplicationAttemptState() != null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + + " is already stored."); + } + data.setTrackingURL(appAttemptFinish.getTrackingURL()); + data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo()); + data.setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus()); + data.setYarnApplicationAttemptState(appAttemptFinish.getYarnApplicationAttemptState()); + } + + private ConcurrentMap + getSubMap(ApplicationId appId) { + applicationAttemptData.putIfAbsent(appId, + new ConcurrentHashMap()); + return applicationAttemptData.get(appId); + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + ConcurrentMap subMap = + getSubMap(containerStart.getContainerId().getApplicationAttemptId()); + ContainerHistoryData oldData = subMap.putIfAbsent( + containerStart.getContainerId(), + ContainerHistoryData.newInstance( + containerStart.getContainerId(), + containerStart.getAllocatedResource(), + containerStart.getAssignedNode(), + containerStart.getPriority(), + containerStart.getStartTime(), + Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null)); + if (oldData != null) { + throw new IOException("The start information of container " + + containerStart.getContainerId() + " is already stored."); + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + ConcurrentMap subMap = + getSubMap(containerFinish.getContainerId().getApplicationAttemptId()); + ContainerHistoryData data = subMap.get(containerFinish.getContainerId()); + if (data == null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is stored before" + + " the start information."); + } + // Make the assumption that ContainerState should not be null if + // the finish information is already recorded + if (data.getContainerState() != null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is already stored."); + } + data.setFinishTime(containerFinish.getFinishTime()); + data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo()); + data.setLogURL(containerFinish.getLogURL()); + data.setContainerExitStatus(containerFinish.getContainerExitStatus()); + data.setContainerState(containerFinish.getContainerState()); + } + + private ConcurrentMap getSubMap( + ApplicationAttemptId appAttemptId) { + containerData.putIfAbsent(appAttemptId, + new ConcurrentHashMap()); + return containerData.get(appAttemptId); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStoreTestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStoreTestUtils.java new file mode 100644 index 0000000..4eafdb2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStoreTestUtils.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +public class ApplicationHistoryStoreTestUtils { + + protected ApplicationHistoryStore store; + + protected void writeApplicationStartData(ApplicationId appId) + throws IOException { + store.applicationStarted( + ApplicationStartData.newInstance(appId, appId.toString(), "test type", + "test queue", + "test user", 0, 0)); + } + + protected void writeApplicationFinishData(ApplicationId appId) + throws IOException { + store.applicationFinished( + ApplicationFinishData.newInstance( + appId, 0, appId.toString(), FinalApplicationStatus.UNDEFINED, + YarnApplicationState.FINISHED)); + } + + protected void writeApplicationAttemptStartData( + ApplicationAttemptId appAttemptId) throws IOException { + store.applicationAttemptStarted( + ApplicationAttemptStartData.newInstance( + appAttemptId, appAttemptId.toString(), 0, + ContainerId.newInstance(appAttemptId, 1))); + } + + protected void writeApplicationAttemptFinishData( + ApplicationAttemptId appAttemptId) throws IOException { + store.applicationAttemptFinished( + ApplicationAttemptFinishData.newInstance(appAttemptId, + appAttemptId.toString(), "test diagnostics info", + FinalApplicationStatus.UNDEFINED, + YarnApplicationAttemptState.FINISHED)); + } + + protected void writeContainerStartData(ContainerId containerId) + throws IOException { + store.containerStarted( + ContainerStartData.newInstance(containerId, Resource.newInstance(0, 0), + NodeId.newInstance("localhost", 0), + Priority.newInstance(containerId.getId()), 0)); + } + + protected void writeContainerFinishData(ContainerId containerId) + throws IOException { + store.containerFinished( + ContainerFinishData.newInstance(containerId, 0, containerId.toString(), + "http://localhost:0/", 0, ContainerState.COMPLETE)); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestMemoryApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestMemoryApplicationHistoryStore.java index 82a764e..51c01ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestMemoryApplicationHistoryStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestMemoryApplicationHistoryStore.java @@ -19,102 +19,189 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice; import java.io.IOException; -import java.util.HashMap; import junit.framework.Assert; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; -import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl; -import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl; -import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl; -import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestMemoryApplicationHistoryStore { - MemoryApplicationHistoryStore memstore = null; +public class TestMemoryApplicationHistoryStore extends + ApplicationHistoryStoreTestUtils { @Before - public void setup() throws Throwable { - memstore = MemoryApplicationHistoryStore.getMemoryStore(); - writeHistoryApplication(); - writeHistoryApplicationAttempt(); - writeContainer(); + public void setup() { + store = new MemoryApplicationHistoryStore(); } - public void writeHistoryApplication() throws Throwable { - ApplicationHistoryData appData = new ApplicationHistoryDataPBImpl(); - appData.setApplicationId(ApplicationId.newInstance(1234, 1)); - memstore.writeApplication(appData); - } - - public void writeHistoryApplicationAttempt() throws Throwable { - ApplicationAttemptHistoryData appAttemptHistoryData = - new ApplicationAttemptHistoryDataPBImpl(); - appAttemptHistoryData.setApplicationAttemptId(ApplicationAttemptId - .newInstance(ApplicationId.newInstance(1234, 1), 1)); - memstore.writeApplicationAttempt(appAttemptHistoryData); + @Test + public void testReadWriteApplicationHistory() throws Exception { + // Out of order + ApplicationId appId = ApplicationId.newInstance(0, 1); + try { + writeApplicationFinishData(appId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains( + "is stored before the start information")); + } + // Normal + int numApps = 5; + for (int i = 1; i <= numApps; ++i) { + appId = ApplicationId.newInstance(0, i); + writeApplicationStartData(appId); + writeApplicationFinishData(appId); + } + Assert.assertEquals(numApps, store.getAllApplications().size()); + for (int i = 1; i <= numApps; ++i) { + appId = ApplicationId.newInstance(0, i); + ApplicationHistoryData data = store.getApplication(appId); + Assert.assertNotNull(data); + Assert.assertEquals(appId.toString(), data.getApplicationName()); + Assert.assertEquals(appId.toString(), data.getDiagnosticsInfo()); + } + // Write again + appId = ApplicationId.newInstance(0, 1); + try { + writeApplicationStartData(appId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } + try { + writeApplicationFinishData(appId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } } - public void writeContainer() throws Throwable { - ContainerHistoryData container = new ContainerHistoryDataPBImpl(); - container.setContainerId(ContainerId.newInstance(ApplicationAttemptId - .newInstance(ApplicationId.newInstance(1234, 1), 1), 1)); - memstore.writeContainer(container); + @Test + public void testReadWriteApplicationAttemptHistory() throws Exception { + // Out of order + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + try { + writeApplicationAttemptFinishData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains( + "is stored before the start information")); + } + // Normal + int numAppAttempts = 5; + writeApplicationStartData(appId); + for (int i = 1; i <= numAppAttempts; ++i) { + appAttemptId = + ApplicationAttemptId.newInstance(appId, i); + writeApplicationAttemptStartData(appAttemptId); + writeApplicationAttemptFinishData(appAttemptId); + } + Assert.assertEquals( + numAppAttempts, store.getApplicationAttempts(appId).size()); + for (int i = 1; i <= numAppAttempts; ++i) { + appAttemptId = + ApplicationAttemptId.newInstance(appId, i); + ApplicationAttemptHistoryData data = + store.getApplicationAttempt(appAttemptId); + Assert.assertNotNull(data); + Assert.assertEquals(appAttemptId.toString(), data.getHost()); + Assert.assertEquals(appAttemptId.toString(), data.getDiagnosticsInfo()); + } + writeApplicationFinishData(appId); + // Write again + appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + try { + writeApplicationAttemptStartData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } + try { + writeApplicationAttemptFinishData(appAttemptId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } } - public ContainerHistoryData writeContainer(ApplicationAttemptId appAttemptId, - int containerId) throws Throwable { - ContainerHistoryData container = new ContainerHistoryDataPBImpl(); - container - .setContainerId(ContainerId.newInstance(appAttemptId, containerId)); - memstore.writeContainer(container); - return container; - } - - @After - public void tearDown() { + @Test + public void testReadWriteContainerHistory() throws Exception { + // Out of order + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + try { + writeContainerFinishData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains( + "is stored before the start information")); + } + // Normal + writeApplicationAttemptStartData(appAttemptId); + int numContainers = 5; + for (int i = 1; i <= numContainers; ++i) { + containerId = ContainerId.newInstance(appAttemptId, i); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + } + Assert.assertEquals( + numContainers, store.getContainers(appAttemptId).size()); + for (int i = 1; i <= numContainers; ++i) { + containerId = ContainerId.newInstance(appAttemptId, i); + ContainerHistoryData data = store.getContainer(containerId); + Assert.assertNotNull(data); + Assert.assertEquals(Priority.newInstance(containerId.getId()), + data.getPriority()); + Assert.assertEquals(containerId.toString(), data.getDiagnosticsInfo()); + } + ContainerHistoryData masterContainer = store.getAMContainer(appAttemptId); + Assert.assertNotNull(masterContainer); + Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1), + masterContainer.getContainerId()); + writeApplicationAttemptFinishData(appAttemptId); + // Write again + containerId = ContainerId.newInstance(appAttemptId, 1); + try { + writeContainerStartData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } + try { + writeContainerFinishData(containerId); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("is already stored")); + } } @Test - public void testReadApplication() throws Throwable { - HashMap map = - (HashMap) memstore - .getAllApplications(); - Assert.assertEquals(1, map.size()); - ApplicationHistoryData appData = null; - for (ApplicationId appId : map.keySet()) { - appData = map.get(appId); - Assert.assertEquals("application_1234_0001", appData.getApplicationId() - .toString()); - } - HashMap appAttempts = - (HashMap) memstore - .getApplicationAttempts(appData.getApplicationId()); - Assert.assertEquals(1, appAttempts.size()); - ApplicationAttemptHistoryData appAttempt = null; - for (ApplicationAttemptId appAttemptId : appAttempts.keySet()) { - appAttempt = appAttempts.get(appAttemptId); - Assert.assertEquals("appattempt_1234_0001_000001", appAttempt - .getApplicationAttemptId().toString()); - } - ContainerHistoryData amContainer = - memstore.getContainer(ContainerId.newInstance(ApplicationAttemptId - .newInstance(ApplicationId.newInstance(1234, 1), 1), 1)); - Assert.assertEquals("container_1234_0001_01_000001", amContainer - .getContainerId().toString()); - ContainerHistoryData container2 = - writeContainer(appAttempt.getApplicationAttemptId(), 2); - HashMap containers = - (HashMap) memstore - .getContainers(appAttempt.getApplicationAttemptId()); - Assert.assertEquals(2, containers.size()); - Assert.assertEquals("container_1234_0001_01_000002", containers.get( - container2.getContainerId()).getContainerId().toString()); + public void testMassiveWriteContainerHistory() throws IOException { + long mb = 1024 * 1024; + Runtime runtime = Runtime.getRuntime(); + long usedMemoryBefore = (runtime.totalMemory() - runtime.freeMemory()) / mb; + int numContainers = 100000; + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + for (int i = 1; i <= numContainers; ++i) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, i); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + } + long usedMemoryAfter = (runtime.totalMemory() - runtime.freeMemory()) / mb; + Assert.assertTrue((usedMemoryAfter - usedMemoryBefore) < 100); } + }