diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index ed04555..e4ab17f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -40,5 +40,15 @@ org.apache.hadoop hadoop-yarn-server-common + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + test-jar + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/HistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/HistoryWriter.java new file mode 100644 index 0000000..bd41101 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/HistoryWriter.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.pb.impl.ApplicationAttemptHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.pb.impl.ApplicationHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.pb.impl.ContainerHistoryDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + *

+ * It is the base class of writing the application history. This class exposes + * the interfaces of writing {@link RMApp}, {@RMAppAttempt} and {@RMContainer}. + * These interfaces are non-blocking, and just schedule a writing history event. + * A dispatcher will handle the event in a separate thread, and extract the + * exact fields that are going to be persisted. The implementation of this class + * should define the detailed persistence operations, which are supposed to be + * blocking. + *

+ */ +@Private +@Unstable +public abstract class HistoryWriter { + + public static final Log LOG = LogFactory.getLog(HistoryWriter.class); + + protected Dispatcher dispatcher; + + public HistoryWriter(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + this.dispatcher.register( + WritingHistoryEventType.class, new ForwardingEventHandler()); + } + + /** + * Non-blocking API + * {@link ResourceManager} should call this method to record the finished + * {@link RMApp}. + * @param app the {@link RMApp} object to be recorded + */ + @SuppressWarnings("unchecked") + public void writeApplication(RMApp app) { + dispatcher.getEventHandler().handle(new WritingAppHistoryEvent(app)); + } + + /** + * Blocking API + * The subclass needs to implement this method to persist the protobuf + * object of an application. + * This method must not be called on the thread of the dispatcher. + * @param app the protobuf object of Application that contains the fields for + * persistence + * @throws Throwable + */ + protected abstract void writeApplication( + ApplicationHistoryData app) throws Throwable; + + /** + * Non-blocking API + * {@link ResourceManager} should call this method to record the finished + * {@link }. + * @param appAttempt the {@link RMAppAttempt} object to be recorded + */ + @SuppressWarnings("unchecked") + public void writeApplicationAttempt(RMAppAttempt appAttempt) { + dispatcher.getEventHandler().handle( + new WritingAppAttemptHistoryEvent(appAttempt)); + } + + /** + * Blocking API + * The subclass needs to implement this method to persist the protobuf + * object of an application attempt. + * This method must not be called on the thread of the dispatcher. + * @param appAttempt the protobuf object of AppAttempt that contains the + * fields for persistence + * @throws Throwable + */ + protected abstract void writeApplicationAttempt( + ApplicationAttemptHistoryData appAttempt) throws Throwable; + + /** + * Non-blocking API + * {@link ResourceManager} should call this method to record the finished + * {@link RMContainer}. + * @param container the {@link RMContainer} object to be recorded + */ + @SuppressWarnings("unchecked") + public void writeContainer(RMContainer container) { + dispatcher.getEventHandler().handle( + new WritingContainerHistoryEvent(container)); + } + + /** + * Blocking API + * The subclass needs to implement this method to persist the protobuf + * object of a container. + * This method must not be called on the thread of the dispatcher. + * @param container the protobuf object of Container that contains the fields + * for persistence + * @throws Throwable + */ + protected abstract void writeContainer( + ContainerHistoryData container) throws Throwable; + + protected void handleWritingHistoryEvent(WritingHistoryEvent event) { + switch(event.getType()) { + case WRITING_APP: + WritingAppHistoryEvent appEvent = (WritingAppHistoryEvent) event; + RMApp app = appEvent.getApplication(); + ApplicationHistoryData appHD = new ApplicationHistoryDataPBImpl(); + assert app.getApplicationId() instanceof ApplicationIdPBImpl; + appHD.setApplicationId(app.getApplicationId()); + appHD.setApplicationType(app.getApplicationType()); + appHD.setApplicationName(app.getName()); + appHD.setQueue(app.getQueue()); + appHD.setUser(app.getUser()); + appHD.setSubmitTime(app.getSubmitTime()); + appHD.setStartTime(app.getStartTime()); + appHD.setFinishTime(app.getFinishTime()); + appHD.setDiagnosticsInfo(app.getDiagnostics().toString()); + appHD.setFinalApplicationStatus(app.getFinalApplicationStatus()); + try { + LOG.info("Writing the history data for Application " + + appHD.getApplicationId()); + writeApplication(appHD); + } catch (Throwable t) { + LOG.error("Failed to write the history data for Application " + + appHD.getApplicationId()); + } + break; + case WRITING_APP_ATTEMPT: + WritingAppAttemptHistoryEvent appAttemptEvent = + (WritingAppAttemptHistoryEvent) event; + RMAppAttempt appAttempt = appAttemptEvent.getApplicationAttempt(); + ApplicationAttemptHistoryData appAttemptHD = + new ApplicationAttemptHistoryDataPBImpl(); + assert appAttempt.getAppAttemptId() + instanceof ApplicationAttemptIdPBImpl; + appAttemptHD.setApplicationAttemptId(appAttempt.getAppAttemptId()); + appAttemptHD.setHost(appAttempt.getHost()); + appAttemptHD.setRPCPort(appAttempt.getRpcPort()); + assert appAttempt.getMasterContainer().getId() + instanceof ContainerIdPBImpl; + appAttemptHD.setMasterContainerId( + appAttempt.getMasterContainer().getId()); + appAttemptHD.setDiagnosticsInfo(appAttempt.getDiagnostics()); + appAttemptHD.setTrackingURL(appAttempt.getTrackingUrl()); + appAttemptHD.setFinalApplicationStatus( + appAttempt.getFinalApplicationStatus()); + try { + LOG.info("Writing the history data for ApplicationAttempt " + + appAttemptHD.getApplicationAttemptId()); + writeApplicationAttempt(appAttemptHD); + } catch (Throwable t) { + LOG.error("Failed to write the history data for ApplicationAttempt " + + appAttemptHD.getApplicationAttemptId()); + } + break; + case WRITING_CONTAINER: + WritingContainerHistoryEvent containerEvent = + (WritingContainerHistoryEvent) event; + RMContainer container = containerEvent.getContainer(); + ContainerHistoryData containerHD = new ContainerHistoryDataPBImpl(); + // TODO: RMContainer needs to record more information to fill the blank + // fields of ContainerIdPBImpl bellow + assert container.getContainerId() instanceof ContainerIdPBImpl; + containerHD.setContainerId(container.getContainerId()); + containerHD.setAllocatedResource(null); + containerHD.setAssignedNode(null); + containerHD.setPriority(null); + containerHD.setStartTime(0); + containerHD.setFinishTime(0); + containerHD.setDiagnosticsInfo(null); + containerHD.setLogURL(null); + containerHD.setFinalContainerStatus(ContainerState.COMPLETE); + try { + LOG.info("Writing the history data for Container " + + containerHD.getContainerId()); + writeContainer(containerHD); + } catch (Throwable t) { + LOG.error("Failed to write the history data for Container " + + containerHD.getContainerId()); + } + break; + default: + LOG.error("Unknown WritingHistoryEvent type: " + event.getType()); + } + } + + /** + * EventHandler implementation which forward events to HistoryWriter + * Making use of it, HistoryWriter can avoid to have a public handle method + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(WritingHistoryEvent event) { + handleWritingHistoryEvent(event); + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppAttemptHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppAttemptHistoryEvent.java new file mode 100644 index 0000000..8bd30f9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppAttemptHistoryEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; + + +public class WritingAppAttemptHistoryEvent extends WritingHistoryEvent { + + private RMAppAttempt appAttempt; + + public WritingAppAttemptHistoryEvent(RMAppAttempt appAttempt) { + super(WritingHistoryEventType.WRITING_APP_ATTEMPT); + this.appAttempt = appAttempt; + } + + public RMAppAttempt getApplicationAttempt() { + return appAttempt; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppHistoryEvent.java new file mode 100644 index 0000000..51d43b5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingAppHistoryEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; + + +public class WritingAppHistoryEvent extends WritingHistoryEvent { + + private RMApp app; + + public WritingAppHistoryEvent(RMApp app) { + super(WritingHistoryEventType.WRITING_APP); + this.app = app; + } + + public RMApp getApplication() { + return app; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingContainerHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingContainerHistoryEvent.java new file mode 100644 index 0000000..c7ec930 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingContainerHistoryEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + + +public class WritingContainerHistoryEvent extends WritingHistoryEvent { + + private RMContainer container; + + public WritingContainerHistoryEvent(RMContainer container) { + super(WritingHistoryEventType.WRITING_CONTAINER); + this.container = container; + } + + public RMContainer getContainer() { + return container; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEvent.java new file mode 100644 index 0000000..486aa06 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEvent.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.hadoop.yarn.event.AbstractEvent; + + +public class WritingHistoryEvent extends AbstractEvent { + + public WritingHistoryEvent(WritingHistoryEventType type) { + super(type); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEventType.java new file mode 100644 index 0000000..70443cf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/WritingHistoryEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + + +public enum WritingHistoryEventType { + WRITING_APP, + WRITING_APP_ATTEMPT, + WRITING_CONTAINER +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestHistoryWriter.java new file mode 100644 index 0000000..535c511 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestHistoryWriter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class TestHistoryWriter { + + private DrainDispatcher dispatcher; + private MyHistoryWriter historyWriter; + + @Before + public void setup() { + dispatcher = new DrainDispatcher(); + dispatcher.init(new Configuration()); + dispatcher.start(); + historyWriter = new MyHistoryWriter(dispatcher); + } + + @After + public void tearDown() { + dispatcher.stop(); + } + + @Test + public void testWriteApplication() { + RMApp app = + new MockRMApp(1, System.currentTimeMillis(), RMAppState.RUNNING); + historyWriter.writeApplication(app); + dispatcher.await(); + Assert.assertTrue(historyWriter.appWritten.get()); + } + + @Test + public void testWriteApplicationAttempt() { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), 1)); + when(appAttempt.getHost()).thenReturn("localhost"); + when(appAttempt.getRpcPort()).thenReturn(9000); + Container container = mock(Container.class); + when(container.getId()).thenReturn( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("N/A"); + when(appAttempt.getTrackingUrl()).thenReturn("http://localhost/log"); + when(appAttempt.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.SUCCEEDED); + historyWriter.writeApplicationAttempt(appAttempt); + dispatcher.await(); + Assert.assertTrue(historyWriter.appAttemptWritten.get()); + } + + @Test + public void testWriteContainer() { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1)); + historyWriter.writeContainer(container); + dispatcher.await(); + Assert.assertTrue(historyWriter.containerWritten.get()); + } + + private static class MyHistoryWriter extends HistoryWriter { + + private AtomicBoolean appWritten = new AtomicBoolean(false); + private AtomicBoolean appAttemptWritten = new AtomicBoolean(false); + private AtomicBoolean containerWritten = new AtomicBoolean(false); + + public MyHistoryWriter(Dispatcher dispatcher) { + super(dispatcher); + } + + @Override + protected void writeApplication(ApplicationHistoryData app) + throws Throwable { + appWritten.set(true); + } + + @Override + protected void writeApplicationAttempt( + ApplicationAttemptHistoryData appAttempt) throws Throwable { + appAttemptWritten.set(true); + } + + @Override + protected void writeContainer(ContainerHistoryData container) + throws Throwable { + containerWritten.set(true); + } + + } + +}