getRMApps() {
return this.applications;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 841f387..b52c4b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
@@ -195,9 +196,18 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
-
+
+ RMApplicationHistoryWriter ahWriter =
+ new RMApplicationHistoryWriter();
+ try {
+ ahWriter.init(conf);
+ } catch (Exception e) {
+ LOG.error("Failed to init RMApplicationHistoryWriter", e);
+ ExitUtil.terminate(1, e);
+ }
+
this.rmContext =
- new RMContextImpl(this.rmDispatcher, rmStore,
+ new RMContextImpl(this.rmDispatcher, rmStore, ahWriter,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.amRmTokenSecretManager,
this.containerTokenSecretManager, this.nmTokenSecretManager,
@@ -681,6 +691,14 @@ protected void serviceStop() throws Exception {
} catch (Exception e) {
LOG.error("Error closing store.", e);
}
+
+ RMApplicationHistoryWriter ahWriter =
+ rmContext.getApplicationHistoryWriter();
+ try {
+ ahWriter.close();
+ } catch (Exception e) {
+ LOG.error("Error when closing RMApplicationHistoryWriter.", e);
+ }
}
super.serviceStop();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
new file mode 100644
index 0000000..3bb6b94
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.DummyApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerHistoryDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ *
+ * {@link ResourceManager} uses this class to write {@link RMApp},
+ * {@link RMAppAttempt} and {@link RMContainer}. These APIs are non-blocking,
+ * and just schedule a writing history event. An self-contained dispatcher will
+ * handle the event in a separate thread, and extract the exact fields that are
+ * going to be persisted. Then, the extracted information will be persisted via
+ * the implementation of {@link ApplicationHistoryWriter}.
+ *
+ */
+@Private
+@Unstable
+public class RMApplicationHistoryWriter extends AbstractService {
+
+ public static final Log LOG =
+ LogFactory.getLog(RMApplicationHistoryWriter.class);
+
+ protected AsyncDispatcher dispatcher;
+ protected ApplicationHistoryWriter writer;
+
+ public RMApplicationHistoryWriter() {
+ super(RMApplicationHistoryWriter.class.getName());
+ }
+
+ @Override
+ protected synchronized void serviceInit(
+ Configuration conf) throws Exception {
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.register(
+ WritingHistoryEventType.class, new ForwardingEventHandler());
+ dispatcher.start();
+
+ boolean ahWriterEnabled = conf.getBoolean(
+ YarnConfiguration.RM_AHS_WRITER_ENABLED,
+ YarnConfiguration.DEFAULT_RM_AHS_WRITER_ENABLED);
+ if (ahWriterEnabled) {
+ try {
+ Class extends ApplicationHistoryWriter> writerClass =
+ conf.getClass(YarnConfiguration.RM_AHS_WRITER_CLASS,
+ DummyApplicationHistoryWriter.class,
+ ApplicationHistoryWriter.class);
+ writer = (ApplicationHistoryWriter) writerClass.newInstance();
+ } catch (Exception e) {
+ LOG.error("Could not instantiate ApplicationHistoryWriter: " +
+ conf.get(YarnConfiguration.RM_AHS_WRITER_CLASS,
+ DummyApplicationHistoryWriter.class.getName()), e);
+ throw e;
+ }
+ } else {
+ writer = new DummyApplicationHistoryWriter();
+ }
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ dispatcher.stop();
+
+ super.serviceStop();
+ }
+
+ /**
+ * Non-blocking API
+ * {@link ResourceManager} should call this method to record the finished
+ * {@link RMApp}.
+ * @param app the {@link RMApp} object to be recorded
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void writeApplication(RMApp app) {
+ dispatcher.getEventHandler().handle(new WritingAppHistoryEvent(app));
+ }
+
+ /**
+ * Non-blocking API
+ * {@link ResourceManager} should call this method to record the finished
+ * {@link RMAppAttempt}.
+ * @param appAttempt the {@link RMAppAttempt} object to be recorded
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void writeApplicationAttempt(RMAppAttempt appAttempt) {
+ dispatcher.getEventHandler().handle(
+ new WritingAppAttemptHistoryEvent(appAttempt));
+ }
+
+ /**
+ * Non-blocking API
+ * {@link ResourceManager} should call this method to record the finished
+ * {@link RMContainer}.
+ * @param container the {@link RMContainer} object to be recorded
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void writeContainer(RMContainer container) {
+ dispatcher.getEventHandler().handle(
+ new WritingContainerHistoryEvent(container));
+ }
+
+ protected synchronized void handleWritingHistoryEvent(
+ WritingHistoryEvent event) {
+ switch(event.getType()) {
+ case WRITING_APP:
+ WritingAppHistoryEvent appEvent = (WritingAppHistoryEvent) event;
+ RMApp app = appEvent.getApplication();
+ ApplicationHistoryData appHD = new ApplicationHistoryDataPBImpl();
+ assert app.getApplicationId() instanceof ApplicationIdPBImpl;
+ appHD.setApplicationId(app.getApplicationId());
+ appHD.setApplicationType(app.getApplicationType());
+ appHD.setApplicationName(app.getName());
+ appHD.setQueue(app.getQueue());
+ appHD.setUser(app.getUser());
+ appHD.setSubmitTime(app.getSubmitTime());
+ appHD.setStartTime(app.getStartTime());
+ appHD.setFinishTime(app.getFinishTime());
+ appHD.setDiagnosticsInfo(app.getDiagnostics().toString());
+ appHD.setFinalApplicationStatus(app.getFinalApplicationStatus());
+ try {
+ LOG.info("Writing the history data for Application " +
+ appHD.getApplicationId());
+ writer.writeApplication(appHD);
+ } catch (Throwable t) {
+ LOG.error("Failed to write the history data for Application " +
+ appHD.getApplicationId());
+ }
+ break;
+ case WRITING_APP_ATTEMPT:
+ WritingAppAttemptHistoryEvent appAttemptEvent =
+ (WritingAppAttemptHistoryEvent) event;
+ RMAppAttempt appAttempt = appAttemptEvent.getApplicationAttempt();
+ ApplicationAttemptHistoryData appAttemptHD =
+ new ApplicationAttemptHistoryDataPBImpl();
+ assert appAttempt.getAppAttemptId()
+ instanceof ApplicationAttemptIdPBImpl;
+ appAttemptHD.setApplicationAttemptId(appAttempt.getAppAttemptId());
+ appAttemptHD.setHost(appAttempt.getHost());
+ appAttemptHD.setRPCPort(appAttempt.getRpcPort());
+ assert appAttempt.getMasterContainer().getId()
+ instanceof ContainerIdPBImpl;
+ appAttemptHD.setMasterContainerId(
+ appAttempt.getMasterContainer().getId());
+ appAttemptHD.setDiagnosticsInfo(appAttempt.getDiagnostics());
+ appAttemptHD.setTrackingURL(appAttempt.getTrackingUrl());
+ appAttemptHD.setFinalApplicationStatus(
+ appAttempt.getFinalApplicationStatus());
+ try {
+ LOG.info("Writing the history data for ApplicationAttempt " +
+ appAttemptHD.getApplicationAttemptId());
+ writer.writeApplicationAttempt(appAttemptHD);
+ } catch (Throwable t) {
+ LOG.error("Failed to write the history data for ApplicationAttempt "
+ + appAttemptHD.getApplicationAttemptId());
+ }
+ break;
+ case WRITING_CONTAINER:
+ WritingContainerHistoryEvent containerEvent =
+ (WritingContainerHistoryEvent) event;
+ RMContainer container = containerEvent.getContainer();
+ ContainerHistoryData containerHD = new ContainerHistoryDataPBImpl();
+ // TODO: RMContainer needs to record more information to fill the blank
+ // fields of ContainerIdPBImpl bellow
+ assert container.getContainerId() instanceof ContainerIdPBImpl;
+ containerHD.setContainerId(container.getContainerId());
+ containerHD.setAllocatedResource(null);
+ containerHD.setAssignedNode(null);
+ containerHD.setPriority(null);
+ containerHD.setStartTime(0);
+ containerHD.setFinishTime(0);
+ containerHD.setDiagnosticsInfo(null);
+ containerHD.setLogURL(null);
+ containerHD.setFinalContainerStatus(ContainerState.COMPLETE);
+ try {
+ LOG.info("Writing the history data for Container " +
+ containerHD.getContainerId());
+ writer.writeContainer(containerHD);
+ } catch (Throwable t) {
+ LOG.error("Failed to write the history data for Container " +
+ containerHD.getContainerId());
+ }
+ break;
+ default:
+ LOG.error("Unknown WritingHistoryEvent type: " + event.getType());
+ }
+ }
+
+ /**
+ * EventHandler implementation which forward events to HistoryWriter
+ * Making use of it, HistoryWriter can avoid to have a public handle method
+ */
+ protected final class ForwardingEventHandler
+ implements EventHandler {
+
+ @Override
+ public void handle(WritingHistoryEvent event) {
+ handleWritingHistoryEvent(event);
+ }
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppAttemptHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppAttemptHistoryEvent.java
new file mode 100644
index 0000000..771d46a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppAttemptHistoryEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+
+public class WritingAppAttemptHistoryEvent extends WritingHistoryEvent {
+
+ private RMAppAttempt appAttempt;
+
+ public WritingAppAttemptHistoryEvent(RMAppAttempt appAttempt) {
+ super(WritingHistoryEventType.WRITING_APP_ATTEMPT);
+ this.appAttempt = appAttempt;
+ }
+
+ public RMAppAttempt getApplicationAttempt() {
+ return appAttempt;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppHistoryEvent.java
new file mode 100644
index 0000000..9dfedd4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingAppHistoryEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+
+
+public class WritingAppHistoryEvent extends WritingHistoryEvent {
+
+ private RMApp app;
+
+ public WritingAppHistoryEvent(RMApp app) {
+ super(WritingHistoryEventType.WRITING_APP);
+ this.app = app;
+ }
+
+ public RMApp getApplication() {
+ return app;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerHistoryEvent.java
new file mode 100644
index 0000000..fa932e1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerHistoryEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+public class WritingContainerHistoryEvent extends WritingHistoryEvent {
+
+ private RMContainer container;
+
+ public WritingContainerHistoryEvent(RMContainer container) {
+ super(WritingHistoryEventType.WRITING_CONTAINER);
+ this.container = container;
+ }
+
+ public RMContainer getContainer() {
+ return container;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEvent.java
new file mode 100644
index 0000000..46363f3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEvent.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+
+public class WritingHistoryEvent extends AbstractEvent {
+
+ public WritingHistoryEvent(WritingHistoryEventType type) {
+ super(type);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java
new file mode 100644
index 0000000..5070599
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+
+public enum WritingHistoryEventType {
+ WRITING_APP,
+ WRITING_APP_ATTEMPT,
+ WRITING_CONTAINER
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index f2ac3f5..7f96b11 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -716,6 +716,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
+ app.rmContext.getApplicationHistoryWriter().writeApplication(app);
};
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 11fdd94..11a7a22 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -778,6 +778,9 @@ public void transition(RMAppAttemptImpl appAttempt,
rejectedEvent.getApplicationAttemptId().getApplicationId(),
message)
);
+
+ appAttempt.rmContext.getApplicationHistoryWriter()
+ .writeApplicationAttempt(appAttempt);
}
}
@@ -910,6 +913,9 @@ public void transition(RMAppAttemptImpl appAttempt,
// Remove the AppAttempt from the AMRMTokenSecretManager
appAttempt.rmContext.getAMRMTokenSecretManager()
.applicationMasterFinished(appAttemptId);
+
+ appAttempt.rmContext.getApplicationHistoryWriter()
+ .writeApplicationAttempt(appAttempt);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index d44fd3f..7a747b0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -133,6 +134,7 @@
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
private final Container container;
+ private final RMContext rmContext;
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
@@ -142,15 +144,15 @@
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId,
- EventHandler handler,
- ContainerAllocationExpirer containerAllocationExpirer) {
+ RMContext rmContext) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
this.container = container;
this.appAttemptId = appAttemptId;
- this.eventHandler = handler;
- this.containerAllocationExpirer = containerAllocationExpirer;
+ this.rmContext = rmContext;
+ this.eventHandler = rmContext.getDispatcher().getEventHandler();
+ this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -291,6 +293,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+
+ container.rmContext.getApplicationHistoryWriter()
+ .writeContainer(container);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index a261dbf..1ec38a0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -276,9 +276,7 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
- .getApplicationAttemptId(), node.getNodeID(), this.rmContext
- .getDispatcher().getEventHandler(), this.rmContext
- .getContainerAllocationExpirer());
+ .getApplicationAttemptId(), node.getNodeID(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
@@ -401,8 +399,7 @@ public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priorit
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
+ node.getNodeID(), rmContext);
Resources.addTo(currentReservation, container.getResource());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
index 670e961..5b416c4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
@@ -326,8 +326,7 @@ public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
+ node.getNodeID(), rmContext);
Resources.addTo(currentReservation, container.getResource());
@@ -539,9 +538,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) &&
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
- getApplicationAttemptId(), node.getNodeID(), rmContext
- .getDispatcher().getEventHandler(), rmContext
- .getContainerAllocationExpirer());
+ getApplicationAttemptId(), node.getNodeID(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6698412..7be9238 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -99,7 +99,7 @@ public static RMContext mockRMContext(int n, long time) {
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
- return new RMContextImpl(rmDispatcher,
+ return new RMContextImpl(rmDispatcher, null,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index fbec326..9dad521 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -83,7 +83,7 @@ public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
- new RMContextImpl(rmDispatcher, null, null, null,
+ new RMContextImpl(rmDispatcher, null, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
new file mode 100644
index 0000000..beb689c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.ahs;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestRMApplicationHistoryWriter {
+
+ private RMApplicationHistoryWriter rmWriter;
+ private MyApplicationHistoryWriter writer;
+ private DrainDispatcher dispatcher;
+
+ @Before
+ public void setup() {
+ YarnConfiguration conf = new YarnConfiguration();
+ rmWriter = new RMApplicationHistoryWriter() {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.register(
+ WritingHistoryEventType.class, new ForwardingEventHandler());
+ dispatcher.start();
+
+ writer = new MyApplicationHistoryWriter();
+ }
+ };
+ rmWriter.init(conf);
+ writer = (MyApplicationHistoryWriter) rmWriter.writer;
+ dispatcher = (DrainDispatcher) rmWriter.dispatcher;
+ }
+
+ @After
+ public void tearDown() {
+ rmWriter.stop();
+ }
+
+ @Test
+ public void testWriteApplication() {
+ RMApp app =
+ new MockRMApp(1, System.currentTimeMillis(), RMAppState.RUNNING);
+ rmWriter.writeApplication(app);
+ dispatcher.await();
+ Assert.assertTrue(writer.appWritten.get());
+ }
+
+ @Test
+ public void testWriteApplicationAttempt() {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(System.currentTimeMillis(), 1), 1));
+ when(appAttempt.getHost()).thenReturn("localhost");
+ when(appAttempt.getRpcPort()).thenReturn(9000);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1));
+ when(appAttempt.getMasterContainer()).thenReturn(container);
+ when(appAttempt.getDiagnostics()).thenReturn("N/A");
+ when(appAttempt.getTrackingUrl()).thenReturn("http://localhost/log");
+ when(appAttempt.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.SUCCEEDED);
+ rmWriter.writeApplicationAttempt(appAttempt);
+ dispatcher.await();
+ Assert.assertTrue(writer.appAttemptWritten.get());
+ }
+
+ @Test
+ public void testWriteContainer() {
+ RMContainer container = mock(RMContainer.class);
+ when(container.getContainerId()).thenReturn(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1));
+ rmWriter.writeContainer(container);
+ dispatcher.await();
+ Assert.assertTrue(writer.containerWritten.get());
+ }
+
+ private static class MyApplicationHistoryWriter
+ implements ApplicationHistoryWriter {
+
+ private AtomicBoolean appWritten = new AtomicBoolean(false);
+ private AtomicBoolean appAttemptWritten = new AtomicBoolean(false);
+ private AtomicBoolean containerWritten = new AtomicBoolean(false);
+
+ @Override
+ public void writeApplication(ApplicationHistoryData app)
+ throws Throwable {
+ appWritten.set(true);
+ }
+
+ @Override
+ public void writeApplicationAttempt(
+ ApplicationAttemptHistoryData appAttempt) throws Throwable {
+ appAttemptWritten.set(true);
+ }
+
+ @Override
+ public void writeContainer(ContainerHistoryData container)
+ throws Throwable {
+ containerWritten.set(true);
+ }
+
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
index a884552..d375fce 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
@@ -71,7 +71,7 @@ public void setUp() {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
- RMContext context = new RMContextImpl(dispatcher, null,
+ RMContext context = new RMContextImpl(dispatcher, null, null,
null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
index ddb7a90..361cc8c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
@@ -69,7 +69,7 @@ public void handle(Event event) {
}
});
RMContext context =
- new RMContextImpl(dispatcher, null, null, null, null,
+ new RMContextImpl(dispatcher, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf), null);
dispatcher.register(RMNodeEventType.class,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index a4b7404..bcae48e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -142,8 +143,10 @@ public void setUp() throws Exception {
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
RMStateStore store = mock(RMStateStore.class);
+ RMApplicationHistoryWriter ahWriter =
+ mock(RMApplicationHistoryWriter.class);
this.rmContext =
- new RMContextImpl(rmDispatcher, store,
+ new RMContextImpl(rmDispatcher, store, ahWriter,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index cafe4f9..1137685 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -164,8 +165,10 @@ public void setUp() throws Exception {
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amFinishingMonitor = mock(AMLivelinessMonitor.class);
Configuration conf = new Configuration();
+ RMApplicationHistoryWriter ahWriter =
+ mock(RMApplicationHistoryWriter.class);
rmContext =
- new RMContextImpl(rmDispatcher,
+ new RMContextImpl(rmDispatcher, ahWriter,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index d1262d8..daa74f2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -34,6 +35,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -46,6 +49,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl {
+ @SuppressWarnings("unused")
@Test
public void testReleaseWhileRunning() {
@@ -71,8 +75,15 @@ public void testReleaseWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+ RMApplicationHistoryWriter ahWriter =
+ mock(RMApplicationHistoryWriter.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getApplicationHistoryWriter()).thenReturn(ahWriter);
+
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
- nodeId, eventHandler, expirer);
+ nodeId, rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
@@ -116,6 +127,7 @@ public void testReleaseWhileRunning() {
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
}
+ @SuppressWarnings("unused")
@Test
public void testExpireWhileRunning() {
@@ -141,8 +153,15 @@ public void testExpireWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
+ RMApplicationHistoryWriter ahWriter =
+ mock(RMApplicationHistoryWriter.class);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getApplicationHistoryWriter()).thenReturn(ahWriter);
+
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
- nodeId, eventHandler, expirer);
+ nodeId, rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index ec486d7..02f6825 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -31,7 +31,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -308,7 +307,7 @@ public void testRefreshQueues() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
@@ -407,7 +406,7 @@ public void testParseQueue() throws IOException {
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
@@ -420,7 +419,7 @@ public void testReconnectedNode() throws Exception {
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
- cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
+ cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM()));
@@ -447,7 +446,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index c86d6b3..47a9c05 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -44,7 +44,7 @@ public void testQueueParsing() throws Exception {
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
- capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
+ capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 6e7fe78..336c357 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -85,7 +85,7 @@ public EventHandler getEventHandler() {
Configuration conf = new Configuration();
RMContext rmContext =
- new RMContextImpl(nullDispatcher, cae, null, null, null,
+ new RMContextImpl(nullDispatcher, null, cae, null, null, null,
new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index b71726a..f535c79 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -126,7 +126,7 @@ public void testFifoSchedulerCapacityWhenNoNMs() {
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMContext rmContext = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null);
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
@@ -159,7 +159,7 @@ public void testNodeLocalAssignment() throws Exception {
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null);
+ null, null, containerTokenSecretManager, nmTokenSecretManager, null);
FifoScheduler scheduler = new FifoScheduler();
scheduler.reinitialize(new Configuration(), rmContext);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
index aa2d6c6..a50e42f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
@@ -160,7 +160,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
for (RMNode node : deactivatedNodes) {
deactivatedNodesMap.put(node.getHostName(), node);
}
- return new RMContextImpl(null, null, null, null,
+ return new RMContextImpl(null, null, null, null, null,
null, null, null, null, null) {
@Override
public ConcurrentMap getRMApps() {
@@ -201,7 +201,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));