diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 28101cc..776685b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -31,8 +32,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
/**
@@ -74,4 +75,10 @@
void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager);
+
+ RMApplicationHistoryWriter getRMApplicationHistoryWriter();
+
+ void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter rmApplicationHistoryWriter);
+
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d2592ed..7d8be63 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -34,8 +35,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +65,7 @@
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
+ private RMApplicationHistoryWriter rmApplicationHistoryWriter;
public RMContextImpl(Dispatcher rmDispatcher,
RMStateStore store,
@@ -74,7 +76,8 @@ public RMContextImpl(Dispatcher rmDispatcher,
AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
- ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmDispatcher = rmDispatcher;
this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
@@ -85,6 +88,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+ this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@VisibleForTesting
@@ -97,11 +101,12 @@ public RMContextImpl(Dispatcher rmDispatcher,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
- ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
- this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
- amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
- containerTokenSecretManager, nmTokenSecretManager,
- clientToAMTokenSecretManager);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
+ amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
+ containerTokenSecretManager, nmTokenSecretManager,
+ clientToAMTokenSecretManager, rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
@@ -202,4 +207,16 @@ public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
+
+ @Override
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return rmApplicationHistoryWriter;
+ }
+
+ @Override
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+ }
+
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 41b119e..cdfe676 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
@@ -266,6 +267,10 @@ protected RMAppManager createRMAppManager() {
this.applicationACLsManager, this.conf);
}
+ protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
+ return new RMApplicationHistoryWriter();
+ }
+
// sanity check for configurations
protected static void validateConfigs(Configuration conf) {
// validate max-attempts
@@ -350,11 +355,15 @@ protected void serviceInit(Configuration configuration) throws Exception {
delegationTokenRenewer = createDelegationTokenRenewer();
}
+ RMApplicationHistoryWriter rmApplicationHistoryWriter =
+ createRMApplicationHistoryWriter();
+ addService(rmApplicationHistoryWriter);
+
rmContext = new RMContextImpl(
rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
- clientToAMSecretManager);
+ clientToAMSecretManager, rmApplicationHistoryWriter);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
new file mode 100644
index 0000000..235dd403b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.DummyApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ *
+ * {@link ResourceManager} uses this class to write the information of
+ * {@link RMApp}, {@link RMAppAttempt} and {@link RMContainer}. These APIs are
+ * non-blocking, and just schedule a writing history event. An self-contained
+ * dispatcher vector will handle the event in separate threads, and extract the
+ * required fields that are going to be persisted. Then, the extracted
+ * information will be persisted via the implementation of
+ * {@link ApplicationHistoryStore}.
+ *
+ */
+@Private
+@Unstable
+public class RMApplicationHistoryWriter extends CompositeService {
+
+ public static final Log LOG =
+ LogFactory.getLog(RMApplicationHistoryWriter.class);
+
+ private Dispatcher dispatcher;
+ private ApplicationHistoryWriter writer;
+
+ public RMApplicationHistoryWriter() {
+ super(RMApplicationHistoryWriter.class.getName());
+ }
+
+ @Override
+ protected synchronized void serviceInit(
+ Configuration conf) throws Exception {
+ writer = createApplicationHistoryStore(conf);
+ addIfService(writer);
+
+ dispatcher = createDispatcher(conf);
+ dispatcher.register(
+ WritingHistoryEventType.class, new ForwardingEventHandler());
+ addIfService(dispatcher);
+ super.serviceInit(conf);
+ }
+
+ protected Dispatcher createDispatcher(Configuration conf) {
+ HomoDispatcherVector dispatcher = new HomoDispatcherVector(conf.getInt(
+ YarnConfiguration.RM_HISTORY_WRITER_COMPSITE_DISPATCHER_SIZE,
+ YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_COMPSITE_DISPATCHER_SIZE));
+ dispatcher.setDrainEventsOnStop();
+ return dispatcher;
+ }
+
+ protected ApplicationHistoryStore createApplicationHistoryStore(
+ Configuration conf) {
+ boolean ahsEnabled = conf.getBoolean(
+ YarnConfiguration.RM_HISTORY_WRITER_ENABLED,
+ YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_ENABLED);
+ // If the history writer is not enabled, a dummy store will be used to
+ // write nothing
+ if (ahsEnabled) {
+ try {
+ Class extends ApplicationHistoryStore> storeClass =
+ conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_STORE_CLASS,
+ DummyApplicationHistoryStore.class,
+ ApplicationHistoryStore.class);
+ return storeClass.newInstance();
+ } catch (Exception e) {
+ String msg = "Could not instantiate ApplicationHistoryWriter: " +
+ conf.get(YarnConfiguration.RM_HISTORY_WRITER_STORE_CLASS,
+ DummyApplicationHistoryStore.class.getName());
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ } else {
+ return new DummyApplicationHistoryStore();
+ }
+ }
+
+ protected void handleWritingApplicationHistoryEvent(
+ WritingApplicationHistoryEvent event) {
+ switch (event.getType()) {
+ case WRITING_APP_START:
+ WritingApplicationStartEvent wasEvent =
+ (WritingApplicationStartEvent) event;
+ try {
+ writer.applicationStarted(wasEvent.getApplicationStartData());
+ LOG.info("Stored the start data of application "
+ + wasEvent.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the start data of application "
+ + wasEvent.getApplicationId());
+ }
+ break;
+ case WRITING_APP_FINISH:
+ WritingApplicationFinishEvent wafEvent =
+ (WritingApplicationFinishEvent) event;
+ try {
+ writer.applicationFinished(wafEvent.getApplicationFinishData());
+ LOG.info("Stored the finish data of application "
+ + wafEvent.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the finish data of application "
+ + wafEvent.getApplicationId());
+ }
+ break;
+ case WRITING_APP_ATTEMPT_START:
+ WritingApplicationAttemptStartEvent waasEvent =
+ (WritingApplicationAttemptStartEvent) event;
+ try {
+ writer.applicationAttemptStarted(waasEvent
+ .getApplicationAttemptStartData());
+ LOG.info("Stored the start data of application attempt "
+ + waasEvent.getApplicationAttemptId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the start data of application attempt "
+ + waasEvent.getApplicationAttemptId());
+ }
+ break;
+ case WRITING_APP_ATTEMPT_FINISH:
+ WritingApplicationAttemptFinishEvent waafEvent =
+ (WritingApplicationAttemptFinishEvent) event;
+ try {
+ writer.applicationAttemptFinished(waafEvent
+ .getApplicationAttemptFinishData());
+ LOG.info("Stored the finish data of application attempt "
+ + waafEvent.getApplicationAttemptId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the finish data of application attempt "
+ + waafEvent.getApplicationAttemptId());
+ }
+ break;
+ case WRITING_CONTAINER_START:
+ WritingContainerStartEvent wcsEvent =
+ (WritingContainerStartEvent) event;
+ try {
+ writer.containerStarted(wcsEvent.getContainerStartData());
+ LOG.info("Stored the start data of container "
+ + wcsEvent.getContainerId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the start data of container "
+ + wcsEvent.getContainerId());
+ }
+ break;
+ case WRITING_CONTAINER_FINISH:
+ WritingContainerFinishEvent wcfEvent =
+ (WritingContainerFinishEvent) event;
+ try {
+ writer.containerFinished(wcfEvent.getContainerFinishData());
+ LOG.info("Stored the finish data of container "
+ + wcfEvent.getContainerId());
+ } catch (IOException e) {
+ LOG.error("Error when storing the finish data of container "
+ + wcfEvent.getContainerId());
+ }
+ break;
+ default:
+ LOG.error("Unknown WritingApplicationHistoryEvent type: "
+ + event.getType());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applicationStarted(RMApp app) {
+ dispatcher.getEventHandler().handle(
+ new WritingApplicationStartEvent(app.getApplicationId(),
+ ApplicationStartData.newInstance(app.getApplicationId(),
+ app.getName(), app.getApplicationType(), app.getQueue(),
+ app.getUser(), app.getSubmitTime(), app.getStartTime())));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applicationFinished(RMApp app) {
+ dispatcher.getEventHandler().handle(
+ new WritingApplicationFinishEvent(app.getApplicationId(),
+ ApplicationFinishData.newInstance(app.getApplicationId(),
+ app.getFinishTime(),
+ app.getDiagnostics().toString(),
+ app.getFinalApplicationStatus(),
+ app.createApplicationState())));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applicationAttemptStarted(RMAppAttempt appAttempt) {
+ dispatcher.getEventHandler().handle(
+ new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(),
+ ApplicationAttemptStartData.newInstance(
+ appAttempt.getAppAttemptId(), appAttempt.getHost(),
+ appAttempt.getRpcPort(), appAttempt.getMasterContainer()
+ .getId())));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applicationAttemptFinished(RMAppAttempt appAttempt) {
+ dispatcher.getEventHandler().handle(
+ new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
+ ApplicationAttemptFinishData.newInstance(appAttempt
+ .getAppAttemptId(), appAttempt.getDiagnostics().toString(),
+ appAttempt.getTrackingUrl(), appAttempt
+ .getFinalApplicationStatus(), appAttempt
+ .createApplicationAttemptState())));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void containerStarted(RMContainer container) {
+ dispatcher.getEventHandler().handle(
+ new WritingContainerStartEvent(container.getContainerId(),
+ ContainerStartData.newInstance(container.getContainerId(),
+ container.getAllocatedResource(), container.getAllocatedNode(),
+ container.getAllocatedPriority(), container.getStartTime())));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void containerFinished(RMContainer container) {
+ dispatcher.getEventHandler().handle(
+ new WritingContainerFinishEvent(container.getContainerId(),
+ ContainerFinishData.newInstance(container.getContainerId(),
+ container.getFinishTime(), container.getDiagnosticsInfo(),
+ container.getLogURL(), container.getContainerExitStatus(),
+ container.getContainerState())));
+ }
+
+ /**
+ * EventHandler implementation which forward events to HistoryWriter Making
+ * use of it, HistoryWriter can avoid to have a public handle method
+ */
+ private final class ForwardingEventHandler
+ implements EventHandler {
+
+ @Override
+ public void handle(WritingApplicationHistoryEvent event) {
+ handleWritingApplicationHistoryEvent(event);
+ }
+
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected static class HomoDispatcherVector
+ extends CompositeService implements Dispatcher {
+
+ private List dispatchers =
+ new ArrayList();
+
+ public HomoDispatcherVector(int num) {
+ super(HomoDispatcherVector.class.getName());
+ for (int i = 0; i < num; ++i) {
+ AsyncDispatcher dispatcher = createDispatcher();
+ dispatchers.add(dispatcher);
+ addIfService(dispatcher);
+ }
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return new CompositEventHandler();
+ }
+
+ @Override
+ public void register(Class extends Enum> eventType, EventHandler handler) {
+ for (AsyncDispatcher dispatcher : dispatchers) {
+ dispatcher.register(eventType, handler);
+ }
+ }
+
+ public void setDrainEventsOnStop() {
+ for (AsyncDispatcher dispatcher : dispatchers) {
+ dispatcher.setDrainEventsOnStop();
+ }
+ }
+
+ private class CompositEventHandler implements EventHandler {
+
+ @Override
+ public void handle(Event event) {
+ // Use hashCode (of ApplicationId) to dispatch the event to the child
+ // dispatcher, such that all the writing events of one application will
+ // be handled by one thread, the scheduled order of the these events
+ // will be preserved
+ int index = Math.abs(event.hashCode()) % dispatchers.size();
+ dispatchers.get(index).getEventHandler().handle(event);
+ }
+
+ }
+
+ protected AsyncDispatcher createDispatcher() {
+ return new AsyncDispatcher();
+ }
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java
new file mode 100644
index 0000000..10bfce2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+
+public class WritingApplicationAttemptFinishEvent extends
+ WritingApplicationHistoryEvent {
+
+ private ApplicationAttemptId appAttemptId;
+ private ApplicationAttemptFinishData appAttemptFinish;
+
+ public WritingApplicationAttemptFinishEvent(
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptFinishData appAttemptFinish) {
+ super(WritingHistoryEventType.WRITING_APP_ATTEMPT_FINISH);
+ this.appAttemptId = appAttemptId;
+ this.appAttemptFinish = appAttemptFinish;
+ }
+
+ @Override
+ public int hashCode() {
+ return appAttemptId.getApplicationId().hashCode();
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptId;
+ }
+
+ public ApplicationAttemptFinishData getApplicationAttemptFinishData() {
+ return appAttemptFinish;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java
new file mode 100644
index 0000000..2b25ba2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+
+public class WritingApplicationAttemptStartEvent extends
+ WritingApplicationHistoryEvent {
+
+ private ApplicationAttemptId appAttemptId;
+ private ApplicationAttemptStartData appAttemptStart;
+
+ public WritingApplicationAttemptStartEvent(ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStartData appAttemptStart) {
+ super(WritingHistoryEventType.WRITING_APP_ATTEMPT_START);
+ this.appAttemptId = appAttemptId;
+ this.appAttemptStart = appAttemptStart;
+ }
+
+ @Override
+ public int hashCode() {
+ return appAttemptId.getApplicationId().hashCode();
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptId;
+ }
+
+ public ApplicationAttemptStartData getApplicationAttemptStartData() {
+ return appAttemptStart;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java
new file mode 100644
index 0000000..1429d2c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+
+public class WritingApplicationFinishEvent extends
+ WritingApplicationHistoryEvent {
+
+ private ApplicationId appId;
+ private ApplicationFinishData appFinish;
+
+ public WritingApplicationFinishEvent(ApplicationId appId,
+ ApplicationFinishData appFinish) {
+ super(WritingHistoryEventType.WRITING_APP_FINISH);
+ this.appId = appId;
+ this.appFinish = appFinish;
+ }
+
+ @Override
+ public int hashCode() {
+ return appId.hashCode();
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ public ApplicationFinishData getApplicationFinishData() {
+ return appFinish;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java
new file mode 100644
index 0000000..bc17edc
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class WritingApplicationHistoryEvent extends
+ AbstractEvent {
+
+ public WritingApplicationHistoryEvent(WritingHistoryEventType type) {
+ super(type);
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java
new file mode 100644
index 0000000..f8790e7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+
+public class WritingApplicationStartEvent extends
+ WritingApplicationHistoryEvent {
+
+ private ApplicationId appId;
+ private ApplicationStartData appStart;
+
+ public WritingApplicationStartEvent(ApplicationId appId,
+ ApplicationStartData appStart) {
+ super(WritingHistoryEventType.WRITING_APP_START);
+ this.appId = appId;
+ this.appStart = appStart;
+ }
+
+ @Override
+ public int hashCode() {
+ return appId.hashCode();
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ public ApplicationStartData getApplicationStartData() {
+ return appStart;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java
new file mode 100644
index 0000000..ec455fd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+
+public class WritingContainerFinishEvent extends WritingApplicationHistoryEvent {
+
+ private ContainerId containerId;
+ private ContainerFinishData containerFinish;
+
+ public WritingContainerFinishEvent(ContainerId containerId,
+ ContainerFinishData containerFinish) {
+ super(WritingHistoryEventType.WRITING_CONTAINER_FINISH);
+ this.containerId = containerId;
+ this.containerFinish = containerFinish;
+ }
+
+ @Override
+ public int hashCode() {
+ return containerId.getApplicationAttemptId().getApplicationId().hashCode();
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public ContainerFinishData getContainerFinishData() {
+ return containerFinish;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java
new file mode 100644
index 0000000..90402f0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+public class WritingContainerStartEvent extends WritingApplicationHistoryEvent {
+
+ private ContainerId containerId;
+ private ContainerStartData containerStart;
+
+ public WritingContainerStartEvent(ContainerId containerId,
+ ContainerStartData containerStart) {
+ super(WritingHistoryEventType.WRITING_CONTAINER_START);
+ this.containerId = containerId;
+ this.containerStart = containerStart;
+ }
+
+ @Override
+ public int hashCode() {
+ return containerId.getApplicationAttemptId().getApplicationId().hashCode();
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public ContainerStartData getContainerStartData() {
+ return containerStart;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java
new file mode 100644
index 0000000..45ffa4a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+
+public enum WritingHistoryEventType {
+ WRITING_APP_START,
+ WRITING_APP_FINISH,
+ WRITING_APP_ATTEMPT_START,
+ WRITING_APP_ATTEMPT_FINISH,
+ WRITING_CONTAINER_START,
+ WRITING_CONTAINER_FINISH
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index e3b083c..517c7da 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -302,6 +302,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
+
+ rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
}
@Override
@@ -908,6 +910,10 @@ private static String getAppKilledDiagnostics() {
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
+ // In this case, RMApp will enter the final state before RMAppAttempt
+ // Therefore, notify not to write the finish data immediately and let
+ // RMAppAttempt to write the finish data first
+ super.writeAppFinished = false;
super.transition(app, event);
}
}
@@ -922,6 +928,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static class FinalTransition extends RMAppTransition {
+ protected boolean writeAppFinished = true;
+
private Set getNodesOnWhichAttemptRan(RMAppImpl app) {
Set nodes = new HashSet();
for (RMAppAttempt attempt : app.attempts.values()) {
@@ -944,6 +952,11 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
+
+ if (writeAppFinished) {
+ app.rmContext.getRMApplicationHistoryWriter()
+ .applicationFinished(app);
+ }
};
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 335dbda..3a666dd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -178,4 +179,21 @@
* @return the start time of the application.
*/
long getStartTime();
+
+ /**
+ * The current state of the {@link RMAppAttempt}.
+ *
+ * @return the current state {@link RMAppAttemptState} for this application
+ * attempt.
+ */
+ RMAppAttemptState getState();
+
+ /**
+ * Create the external user-facing state of the attempt of ApplicationMaster
+ * from the current state of the {@link RMAppAttempt}.
+ *
+ * @return the external user-facing state of the attempt ApplicationMaster.
+ */
+ YarnApplicationAttemptState createApplicationAttemptState();
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1247bb7..03fece7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -61,6 +62,7 @@
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -68,6 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
@@ -783,6 +786,9 @@ public void transition(RMAppAttemptImpl appAttempt,
);
appAttempt.removeCredentials(appAttempt);
+
+ appAttempt.rmContext.getRMApplicationHistoryWriter()
+ .applicationAttemptFinished(appAttempt);
}
}
@@ -1045,6 +1051,17 @@ public void transition(RMAppAttemptImpl appAttempt,
finalAttemptState));
appAttempt.removeCredentials(appAttempt);
+
+ appAttempt.rmContext.getRMApplicationHistoryWriter()
+ .applicationAttemptFinished(appAttempt);
+ // In this case, we need to record the finish data or RMApp as well,
+ // because it will not write the data there
+ if (finalAttemptState == RMAppAttemptState.KILLED) {
+ RMApp app = appAttempt.rmContext.getRMApps().get(
+ appAttemptId.getApplicationId());
+ appAttempt.rmContext.getRMApplicationHistoryWriter()
+ .applicationFinished(app);
+ }
}
}
@@ -1147,6 +1164,9 @@ public void transition(RMAppAttemptImpl appAttempt,
// write at AM launch time, so we don't save the AM's tracking URL anywhere
// as that would mean an extra state-store write. For now, we hope that in
// work-preserving restart, AMs are forced to reregister.
+
+ appAttempt.rmContext.getRMApplicationHistoryWriter()
+ .applicationAttemptStarted(appAttempt);
}
}
@@ -1504,6 +1524,23 @@ public long getStartTime() {
}
}
+ @Override
+ public RMAppAttemptState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public YarnApplicationAttemptState createApplicationAttemptState() {
+ RMAppAttemptState state = getState();
+ return RMServerUtils.createApplicationAttemptState(state);
+ }
+
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index b56f424..adffb99 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -139,6 +140,7 @@
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
private final Container container;
+ private final RMContext rmContext;
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
@@ -151,24 +153,26 @@
private String logURL;
private ContainerStatus finishedStatus;
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId,
- EventHandler handler,
- ContainerAllocationExpirer containerAllocationExpirer,
- String user) {
+ String user, RMContext rmContext) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
this.container = container;
this.appAttemptId = appAttemptId;
- this.eventHandler = handler;
- this.containerAllocationExpirer = containerAllocationExpirer;
this.user = user;
this.startTime = System.currentTimeMillis();
+ this.rmContext = rmContext;
+ this.eventHandler = rmContext.getDispatcher().getEventHandler();
+ this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
+
+ rmContext.getRMApplicationHistoryWriter().containerStarted(this);
}
@Override
@@ -386,6 +390,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+
+ container.rmContext.getRMApplicationHistoryWriter()
+ .containerFinished(container);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 6f6dffc..ee10821 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -232,9 +232,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer(),
- appSchedulingInfo.getUser());
+ node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
Resources.addTo(currentReservation, container.getResource());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 74f897d..3bd2d7c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -121,9 +121,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
- .getApplicationAttemptId(), node.getNodeID(), this.rmContext
- .getDispatcher().getEventHandler(), this.rmContext
- .getContainerAllocationExpirer(), appSchedulingInfo.getUser());
+ .getApplicationAttemptId(), node.getNodeID(),
+ appSchedulingInfo.getUser(), this.rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
index 863d51b..152d37c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
@@ -272,9 +272,8 @@ else if (allowed.equals(NodeType.RACK_LOCAL) &&
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
- getApplicationAttemptId(), node.getNodeID(), rmContext
- .getDispatcher().getEventHandler(), rmContext
- .getContainerAllocationExpirer(), appSchedulingInfo.getUser());
+ getApplicationAttemptId(), node.getNodeID(),
+ appSchedulingInfo.getUser(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6698412..414dca0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -99,9 +100,10 @@ public static RMContext mockRMContext(int n, long time) {
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
return new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
- null, null, null, null, null) {
+ null, null, null, null, null, writer) {
@Override
public ConcurrentMap getRMApps() {
return map;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 9ec82c4..f8f763a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -18,19 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
-import java.util.HashSet;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
@@ -47,10 +47,9 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@@ -78,6 +77,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -510,6 +510,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
.thenReturn(queInfo);
when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))
.thenThrow(new IOException("queue does not exist"));
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
ConcurrentHashMap apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 82046c7..a966efd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -101,7 +101,7 @@ public void setUp() throws Exception {
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
- mock(DelegationTokenRenewer.class), null, null, null, null);
+ mock(DelegationTokenRenewer.class), null, null, null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
new file mode 100644
index 0000000..55f50ca
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMApplicationHistoryWriter {
+
+ private static int MAX_RETRIES = 10;
+
+ private RMApplicationHistoryWriter writer;
+ private ApplicationHistoryStore store;
+ private List dispatchers =
+ new ArrayList();
+
+ @Before
+ public void setup() {
+ store = new MemoryApplicationHistoryStore();
+ Configuration conf = new Configuration();
+ writer = new RMApplicationHistoryWriter() {
+
+ @Override
+ protected ApplicationHistoryStore createApplicationHistoryStore(
+ Configuration conf) {
+ return store;
+ }
+
+ @Override
+ protected Dispatcher createDispatcher(Configuration conf) {
+ HomoDispatcherVector dispatcher = new HomoDispatcherVector(conf.getInt(
+ YarnConfiguration.RM_HISTORY_WRITER_COMPSITE_DISPATCHER_SIZE,
+ YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_COMPSITE_DISPATCHER_SIZE));
+ dispatcher.setDrainEventsOnStop();
+ return dispatcher;
+ }
+
+ class HomoDispatcherVector extends
+ RMApplicationHistoryWriter.HomoDispatcherVector {
+
+ public HomoDispatcherVector(int num) {
+ super(num);
+ }
+
+ @Override
+ protected AsyncDispatcher createDispatcher() {
+ CounterDispatcher dispatcher = new CounterDispatcher();
+ dispatchers.add(dispatcher);
+ return dispatcher;
+ }
+
+ }
+ };
+ writer.init(conf);
+ writer.start();
+ }
+
+ @After
+ public void tearDown() {
+ writer.stop();
+ }
+
+ private static RMApp createRMApp(ApplicationId appId) {
+ RMApp app = mock(RMApp.class);
+ when(app.getApplicationId()).thenReturn(appId);
+ when(app.getName()).thenReturn("test app");
+ when(app.getApplicationType()).thenReturn("test app type");
+ when(app.getUser()).thenReturn("test user");
+ when(app.getQueue()).thenReturn("test queue");
+ when(app.getSubmitTime()).thenReturn(0L);
+ when(app.getStartTime()).thenReturn(1L);
+ when(app.getFinishTime()).thenReturn(2L);
+ when(app.getDiagnostics()).thenReturn(
+ new StringBuilder("test diagnostics info"));
+ when(app.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.UNDEFINED);
+ when(app.createApplicationState())
+ .thenReturn(YarnApplicationState.FINISHED);
+ return app;
+ }
+
+ private static RMAppAttempt createRMAppAttempt(
+ ApplicationAttemptId appAttemptId) {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+ when(appAttempt.getHost()).thenReturn("test host");
+ when(appAttempt.getRpcPort()).thenReturn(-100);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(
+ ContainerId.newInstance(appAttemptId, 1));
+ when(appAttempt.getMasterContainer()).thenReturn(container);
+ when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+ when(appAttempt.getTrackingUrl()).thenReturn("test url");
+ when(appAttempt.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.UNDEFINED);
+ when(appAttempt.createApplicationAttemptState()).thenReturn(
+ YarnApplicationAttemptState.FINISHED);
+ return appAttempt;
+ }
+
+ private static RMContainer createRMContainer(
+ ContainerId containerId) {
+ RMContainer container = mock(RMContainer.class);
+ when(container.getContainerId()).thenReturn(containerId);
+ when(container.getAllocatedNode()).thenReturn(
+ NodeId.newInstance("test host", -100));
+ when(container.getAllocatedResource()).thenReturn(
+ Resource.newInstance(-1, -1));
+ when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
+ when(container.getStartTime()).thenReturn(0L);
+ when(container.getFinishTime()).thenReturn(1L);
+ when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
+ when(container.getLogURL()).thenReturn("test log url");
+ when(container.getContainerExitStatus()).thenReturn(-1);
+ when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
+ return container;
+ }
+
+ @Test
+ public void testWriteApplication() throws Exception {
+ RMApp app = createRMApp(ApplicationId.newInstance(0, 1));
+
+ writer.applicationStarted(app);
+ ApplicationHistoryData appHD = null;
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ appHD = store.getApplication(ApplicationId.newInstance(0, 1));
+ if (appHD != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertNotNull(appHD);
+ Assert.assertEquals("test app", appHD.getApplicationName());
+ Assert.assertEquals("test app type", appHD.getApplicationType());
+ Assert.assertEquals("test user", appHD.getUser());
+ Assert.assertEquals("test queue", appHD.getQueue());
+ Assert.assertEquals(0L, appHD.getSubmitTime());
+ Assert.assertEquals(1L, appHD.getStartTime());
+
+ writer.applicationFinished(app);
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ appHD = store.getApplication(ApplicationId.newInstance(0, 1));
+ if (appHD.getYarnApplicationState() != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertEquals(2L, appHD.getFinishTime());
+ Assert.assertEquals("test diagnostics info", appHD.getDiagnosticsInfo());
+ Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+ appHD.getFinalApplicationStatus());
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appHD.getYarnApplicationState());
+ }
+
+ @Test
+ public void testWriteApplicationAttempt() throws Exception {
+ RMAppAttempt appAttempt = createRMAppAttempt(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ writer.applicationAttemptStarted(appAttempt);
+ ApplicationAttemptHistoryData appAttemptHD = null;
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ appAttemptHD =
+ store.getApplicationAttempt(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ if (appAttemptHD != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertNotNull(appAttemptHD);
+ Assert.assertEquals("test host", appAttemptHD.getHost());
+ Assert.assertEquals(-100, appAttemptHD.getRPCPort());
+ Assert.assertEquals(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1),
+ appAttemptHD.getMasterContainerId());
+
+ writer.applicationAttemptFinished(appAttempt);
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ appAttemptHD =
+ store.getApplicationAttempt(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ if (appAttemptHD.getYarnApplicationAttemptState() != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertEquals("test diagnostics info",
+ appAttemptHD.getDiagnosticsInfo());
+ Assert.assertEquals("test url", appAttemptHD.getTrackingURL());
+ Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+ appAttemptHD.getFinalApplicationStatus());
+ Assert.assertEquals(YarnApplicationAttemptState.FINISHED,
+ appAttemptHD.getYarnApplicationAttemptState());
+ }
+
+ @Test
+ public void testWriteContainer() throws Exception {
+ RMContainer container = createRMContainer(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1));
+ writer.containerStarted(container);
+ ContainerHistoryData containerHD = null;
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ containerHD =
+ store.getContainer(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1));
+ if (containerHD != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertNotNull(containerHD);
+ Assert.assertEquals(NodeId.newInstance("test host", -100),
+ containerHD.getAssignedNode());
+ Assert.assertEquals(Resource.newInstance(-1, -1),
+ containerHD.getAllocatedResource());
+ Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
+ Assert.assertEquals(0L, container.getStartTime());
+
+ writer.containerFinished(container);
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ containerHD =
+ store.getContainer(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1));
+ if (containerHD.getContainerState() != null) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ Assert.assertEquals("test diagnostics info",
+ containerHD.getDiagnosticsInfo());
+ Assert.assertEquals("test log url", containerHD.getLogURL());
+ Assert.assertEquals(-1, containerHD.getContainerExitStatus());
+ Assert.assertEquals(ContainerState.COMPLETE,
+ containerHD.getContainerState());
+ }
+
+ @Test
+ public void testParallelWrite() throws Exception {
+ List appIds = new ArrayList();
+ for (int i = 0; i < 10; ++i) {
+ Random rand = new Random(i);
+ ApplicationId appId = ApplicationId.newInstance(0, rand.nextInt());
+ appIds.add(appId);
+ RMApp app = createRMApp(appId);
+ writer.applicationStarted(app);
+ for (int j = 1; j <= 10; ++j) {
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, j);
+ RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
+ writer.applicationAttemptStarted(appAttempt);
+ for (int k = 1; k <= 10; ++k) {
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
+ RMContainer container = createRMContainer(containerId);
+ writer.containerStarted(container);
+ writer.containerFinished(container);
+ }
+ writer.applicationAttemptFinished(appAttempt);
+ }
+ writer.applicationFinished(app);
+ }
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
+ break;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ Assert.assertTrue(allEventsHandled(20 * 10 * 10 + 20 * 10 + 20));
+ // Validate all events of one application are handled by one dispatcher
+ for (ApplicationId appId : appIds) {
+ Assert.assertTrue(handledByOne(appId));
+ }
+ }
+
+ private boolean allEventsHandled(int expected) {
+ int actual = 0;
+ for (CounterDispatcher dispatcher : dispatchers) {
+ for (Integer count : dispatcher.counts.values()) {
+ actual += count;
+ }
+ }
+ return actual == expected;
+ }
+
+ private boolean handledByOne(ApplicationId appId) {
+ int count = 0;
+ for (CounterDispatcher dispatcher : dispatchers) {
+ if (dispatcher.counts.containsKey(appId)) {
+ ++count;
+ }
+ }
+ return count == 1;
+ }
+
+ private static class CounterDispatcher extends AsyncDispatcher {
+
+ private Map counts =
+ new HashMap();
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected void dispatch(Event event) {
+ if (event instanceof WritingApplicationHistoryEvent) {
+ WritingApplicationHistoryEvent ashEvent =
+ (WritingApplicationHistoryEvent) event;
+ switch (ashEvent.getType()) {
+ case WRITING_APP_START:
+ increment(((WritingApplicationStartEvent) event).getApplicationId());
+ break;
+ case WRITING_APP_FINISH:
+ increment(((WritingApplicationFinishEvent) event)
+ .getApplicationId());
+ break;
+ case WRITING_APP_ATTEMPT_START:
+ increment(((WritingApplicationAttemptStartEvent) event)
+ .getApplicationAttemptId().getApplicationId());
+ break;
+ case WRITING_APP_ATTEMPT_FINISH:
+ increment(((WritingApplicationAttemptFinishEvent) event)
+ .getApplicationAttemptId().getApplicationId());
+ break;
+ case WRITING_CONTAINER_START:
+ increment(((WritingContainerStartEvent) event).getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ break;
+ case WRITING_CONTAINER_FINISH:
+ increment(((WritingContainerFinishEvent) event).getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ break;
+ }
+ }
+ super.dispatch(event);
+ }
+
+ private void increment(ApplicationId appId) {
+ Integer val = counts.get(appId);
+ if (val == null) {
+ counts.put(appId, 1);
+ } else {
+ counts.put(appId, val + 1);
+ }
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
index a884552..756bf45 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
@@ -72,7 +72,7 @@ public void setUp() {
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
RMContext context = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
index cbb2374..455fa78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
@@ -71,7 +71,7 @@ public void setUp() {
new TestRMNodeEventDispatcher());
RMContext context = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
index ddb7a90..4f94695 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
@@ -71,7 +71,7 @@ public void handle(Event event) {
RMContext context =
new RMContextImpl(dispatcher, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf), null);
+ new NMTokenSecretManagerInRM(conf), null, null);
dispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(context));
NodesListManager nodesListManager = new NodesListManager(context);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index b5f4992..ed415c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -22,6 +22,8 @@
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -57,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -95,6 +98,7 @@
private static int appId = 1;
private DrainDispatcher rmDispatcher;
private RMStateStore store;
+ private RMApplicationHistoryWriter writer;
private YarnScheduler scheduler;
// ignore all the RM application attempt events
@@ -190,13 +194,15 @@ public void setUp() throws Exception {
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
+ writer = mock(RMApplicationHistoryWriter.class);
this.rmContext =
new RMContextImpl(rmDispatcher, store,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM());
+ new ClientToAMTokenSecretManagerInRM(),
+ writer);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));
@@ -345,6 +351,7 @@ private void sendAppUpdateSavedEvent(RMApp application) {
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
+ verify(writer).applicationStarted(any(RMApp.class));
// NEW => NEW_SAVING event RMAppEventType.START
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
@@ -466,6 +473,9 @@ public void testUnmanagedApp() throws IOException {
Assert.assertTrue("Finished app missing diagnostics",
application.getDiagnostics().indexOf(diagMsg) != -1);
+ // reset the counter of Mockito.verify
+ reset(writer);
+
// test app fails after 1 app attempt failure
LOG.info("--- START: testUnmanagedAppFailPath ---");
application = testCreateAppRunning(subContext);
@@ -505,6 +515,7 @@ public void testAppNewKill() throws IOException {
application.handle(event);
rmDispatcher.await();
assertKilled(application);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -519,6 +530,7 @@ public void testAppNewReject() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, rejectedText);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@@ -532,6 +544,7 @@ public void testAppNewSavingKill() throws IOException {
application.handle(event);
rmDispatcher.await();
assertKilled(application);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@@ -546,6 +559,7 @@ public void testAppNewSavingReject() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, rejectedText);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@@ -560,6 +574,7 @@ public void testAppSubmittedRejected() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, rejectedText);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -572,6 +587,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
application.handle(event);
rmDispatcher.await();
assertAppAndAttemptKilled(application);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -604,6 +620,7 @@ public void testAppAcceptedFailed() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, ".*" + message + ".*Failing the application.*");
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -616,6 +633,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
application.handle(event);
rmDispatcher.await();
assertAppAndAttemptKilled(application);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -637,6 +655,10 @@ public void testAppRunningKill() throws IOException {
assertAppState(RMAppState.FINAL_SAVING, application);
assertKilled(application);
+
+ // KillAppAndAttemptTransition is gone through, writing application finish
+ // is delayed to BaseFinalTransition, which is not executed.
+ verify(writer, never()).applicationFinished(any(RMApp.class));
}
@Test
@@ -688,6 +710,7 @@ public void testAppRunningFailed() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, ".*Failing the application.*");
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@@ -701,6 +724,10 @@ public void testAppFinishingKill() throws IOException {
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.FINISHED, application);
+
+ // KillAppAndAttemptTransition is gone through, writing application finish
+ // is delayed to BaseFinalTransition, which is not executed.
+ verify(writer, never()).applicationFinished(any(RMApp.class));
}
// While App is at FINAL_SAVING, Attempt_Finished event may come before
@@ -745,6 +772,7 @@ public void testAppFinishedFinished() throws IOException {
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"", diag.toString());
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@@ -775,6 +803,7 @@ public void testAppFailedFailed() throws IOException {
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
+ verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@@ -830,6 +859,10 @@ public void testAppKilledKilled() throws IOException {
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
+
+ // KillAppAndAttemptTransition is gone through, writing application finish
+ // is delayed to BaseFinalTransition, which is not executed.
+ verify(writer, never()).applicationFinished(any(RMApp.class));
}
@Test
@@ -853,6 +886,9 @@ public void testClientTokens() throws Exception {
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNull(report.getClientToAMToken());
+ // reset the counter of Mockito.verify
+ reset(writer);
+
app = testCreateAppRunning(null);
rmDispatcher.await();
assertAppState(RMAppState.RUNNING, app);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index b9fc15f..d3092eb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -116,6 +117,8 @@
private ApplicationMasterLauncher applicationMasterLauncher;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
+ private RMApplicationHistoryWriter writer;
+
private RMStateStore store;
private RMApp application;
@@ -204,13 +207,15 @@ public void setUp() throws Exception {
mock(ContainerAllocationExpirer.class);
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amFinishingMonitor = mock(AMLivelinessMonitor.class);
+ writer = mock(RMApplicationHistoryWriter.class);
rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, amRMTokenManager,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- clientToAMTokenManager);
+ clientToAMTokenManager,
+ writer);
store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
@@ -368,6 +373,7 @@ private void testAppAttemptKilledState(Container amContainer,
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved();
}
@@ -445,6 +451,7 @@ private void testAppAttemptFailedState(Container container,
// Check events
verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved();
}
@@ -480,6 +487,7 @@ private void testAppAttemptRunningState(Container container,
assertEquals(getProxyUrl(applicationAttempt),
applicationAttempt.getTrackingUrl());
}
+ verify(writer).applicationAttemptStarted(any(RMAppAttempt.class));
// TODO - need to add more checks relevant to this state
}
@@ -526,6 +534,7 @@ private void testAppAttemptFinishedState(Container container,
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 11873f3..4050493 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -19,9 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -36,6 +39,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -48,12 +53,10 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl {
- @SuppressWarnings("resource")
@Test
public void testReleaseWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
- EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
@@ -74,19 +77,24 @@ public void testReleaseWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
- nodeId, eventHandler, expirer, "user");
+ nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
+ verify(writer).containerStarted(any(RMContainer.class));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
-
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
drainDispatcher.await();
@@ -114,6 +122,7 @@ public void testReleaseWhileRunning() {
assertEquals(ContainerExitStatus.ABORTED,
rmContainer.getContainerExitStatus());
assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState());
+ verify(writer).containerFinished(any(RMContainer.class));
ArgumentCaptor captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class);
@@ -130,12 +139,10 @@ public void testReleaseWhileRunning() {
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
}
- @SuppressWarnings("resource")
@Test
public void testExpireWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
- EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
@@ -156,13 +163,19 @@ public void testExpireWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
- nodeId, eventHandler, expirer, "user");
+ nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
+ verify(writer).containerStarted(any(RMContainer.class));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
@@ -191,5 +204,6 @@ public void testExpireWhileRunning() {
containerStatus, RMContainerEventType.EXPIRE));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+ verify(writer, never()).containerFinished(any(RMContainer.class));
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 627fae8..4fd083b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -345,7 +345,7 @@ public void testRefreshQueues() throws Exception {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
conf.setCapacity(A, 80f);
@@ -444,7 +444,7 @@ public void testParseQueue() throws IOException {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
}
@Test
@@ -457,7 +457,7 @@ public void testReconnectedNode() throws Exception {
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
@@ -484,7 +484,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// Add a new queue b4
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 5943c4c..d509771 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -41,8 +41,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -248,14 +248,18 @@ public void testSortedQueues() throws Exception {
ContainerAllocationExpirer expirer =
mock(ContainerAllocationExpirer.class);
DrainDispatcher drainDispatcher = new DrainDispatcher();
- EventHandler eventHandler = drainDispatcher.getEventHandler();
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
app_0.getApplicationId(), 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
Container container=TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB), priority);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
- node_0.getNodeID(), eventHandler, expirer, "user");
+ node_0.getNodeID(), "user", rmContext);
// Assign {1,2,3,4} 1GB containers respectively to queues
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index c86d6b3..21c446a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -47,7 +47,7 @@ public void testQueueParsing() throws Exception {
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index b974528..db28dca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -84,12 +85,13 @@ public EventHandler getEventHandler() {
new ContainerAllocationExpirer(nullDispatcher);
Configuration conf = new Configuration();
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext =
new RMContextImpl(nullDispatcher, cae, null, null, null,
new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM());
+ new ClientToAMTokenSecretManagerInRM(), writer);
return rmContext;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 2384590..325d27d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -20,6 +20,8 @@
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -55,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -136,8 +139,9 @@ public void testFifoSchedulerCapacityWhenNoNMs() {
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, writer);
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
@@ -169,8 +173,9 @@ public void testNodeLocalAssignment() throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null);
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler();
scheduler.reinitialize(new Configuration(), rmContext);
@@ -229,8 +234,9 @@ public void testUpdateResourceOnNode() throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null);
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
index aa2d6c6..97cc495 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
@@ -161,7 +161,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
deactivatedNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(null, null, null, null,
- null, null, null, null, null) {
+ null, null, null, null, null, null) {
@Override
public ConcurrentMap getRMApps() {
return applicationsMaps;
@@ -204,7 +204,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
+ new ClientToAMTokenSecretManagerInRM(), null));
return cs;
}